You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by bu...@apache.org on 2010/07/27 22:33:06 UTC

svn commit: r979856 - in /lucene/dev/branches/realtime_search/lucene/src: java/org/apache/lucene/index/ java/org/apache/lucene/store/ test/org/apache/lucene/index/

Author: buschmi
Date: Tue Jul 27 20:33:05 2010
New Revision: 979856

URL: http://svn.apache.org/viewvc?rev=979856&view=rev
Log:
LUCENE-2561: Fix most of the thread-safety and exception handling problems in the realtime search branch

Added:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexReaderPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/store/FilterDirectory.java
Modified:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue Jul 27 20:33:05 2010
@@ -2,13 +2,13 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.io.PrintStream;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +24,8 @@ import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.BytesRef;
 
 /**
@@ -51,7 +53,7 @@ final class DocumentsWriter {
   private final DocumentsWriterThreadPool threadPool;
   private final Lock sequenceIDLock = new ReentrantLock();
 
-  private final Directory directory;
+  private final Directory openFilesTrackingDirectory;
   final IndexWriter indexWriter;
   final IndexWriterConfig config;
 
@@ -69,7 +71,14 @@ final class DocumentsWriter {
   private Map<DocumentsWriterPerThread, Long> minSequenceIDsPerThread = new HashMap<DocumentsWriterPerThread, Long>();
 
   public DocumentsWriter(Directory directory, IndexWriter indexWriter, IndexWriterConfig config) {
-    this.directory = directory;
+    this.openFilesTrackingDirectory = new FilterDirectory(directory) {
+      @Override public IndexOutput createOutput(final String name) throws IOException {
+        addOpenFile(name);
+        return super.createOutput(name);
+      }
+    };
+
+    //this.openFilesTrackingDirectory = directory;
     this.indexWriter = indexWriter;
     this.config = config;
     this.maxBufferedDocs = config.getMaxBufferedDocs();
@@ -111,7 +120,7 @@ final class DocumentsWriter {
   }
 
   DocumentsWriterPerThread newDocumentsWriterPerThread() {
-    DocumentsWriterPerThread perThread = new DocumentsWriterPerThread(directory, this, config
+    DocumentsWriterPerThread perThread = new DocumentsWriterPerThread(openFilesTrackingDirectory, this, config
         .getIndexingChain());
     sequenceIDLock.lock();
     try {
@@ -127,13 +136,23 @@ final class DocumentsWriter {
     return updateDocument(null, doc, analyzer);
   }
 
+  private final static class UpdateResult {
+    long sequenceID;
+    boolean flushed;
+    
+    UpdateResult(long sequenceID) {
+      this.sequenceID = sequenceID;
+      flushed = false;
+    }
+  }
+  
   long updateDocument(final Term delTerm, final Document doc, final Analyzer analyzer)
       throws CorruptIndexException, IOException {
 
-    long seqID = threadPool.executePerThread(this, doc,
-        new DocumentsWriterThreadPool.PerThreadTask<Long>() {
+    UpdateResult result = threadPool.executePerThread(this, doc,
+        new DocumentsWriterThreadPool.PerThreadTask<UpdateResult>() {
           @Override
-          public Long process(final DocumentsWriterPerThread perThread) throws IOException {
+          public UpdateResult process(final DocumentsWriterPerThread perThread) throws IOException {
             long perThreadRAMUsedBeforeAdd = perThread.numBytesUsed;
             perThread.addDocument(doc, analyzer);
 
@@ -154,16 +173,23 @@ final class DocumentsWriter {
               sequenceIDLock.unlock();
             }
 
+            UpdateResult result = new UpdateResult(sequenceID);
             if (finishAddDocument(perThread, perThreadRAMUsedBeforeAdd)) {
+              result.flushed = true;
               super.clearThreadBindings();
             }
-            return sequenceID;
+            return result;
           }
         });
+        
+    if (result == null) {
+      return -1;
+    }
     
-    indexWriter.maybeMerge();
-    
-    return seqID;
+    if (result.flushed) {
+      indexWriter.maybeMerge();
+    }
+    return result.sequenceID;
   }
 
   private final boolean finishAddDocument(DocumentsWriterPerThread perThread,
@@ -257,17 +283,12 @@ final class DocumentsWriter {
 
   final boolean flushAllThreads(final boolean flushDeletes)
       throws IOException {
-    return threadPool.executeAllThreads(new DocumentsWriterThreadPool.AllThreadsTask<Boolean>() {
+    
+    return threadPool.executeAllThreads(this, new DocumentsWriterThreadPool.AllThreadsTask<Boolean>() {
       @Override
       public Boolean process(Iterator<DocumentsWriterPerThread> threadsIterator) throws IOException {
         boolean anythingFlushed = false;
         
-        if (flushDeletes) {
-          if (applyDeletes(indexWriter.segmentInfos)) {
-            indexWriter.checkpoint();
-          }
-        }
-
         while (threadsIterator.hasNext()) {
           DocumentsWriterPerThread perThread = threadsIterator.next();
           final int numDocs = perThread.getNumDocsInRAM();
@@ -282,6 +303,7 @@ final class DocumentsWriter {
     
           if (flushDocs) {
             SegmentInfo newSegment = perThread.flush();
+            newSegment.dir = indexWriter.getDirectory();
             
             if (newSegment != null) {
               anythingFlushed = true;
@@ -315,10 +337,8 @@ final class DocumentsWriter {
   }
 
   /** Build compound file for the segment we just flushed */
-  void createCompoundFile(String segment, DocumentsWriterPerThread perThread) throws IOException {
-    
-    CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, 
-        IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION));
+  void createCompoundFile(String compoundFileName, DocumentsWriterPerThread perThread) throws IOException {
+    CompoundFileWriter cfsWriter = new CompoundFileWriter(openFilesTrackingDirectory, compoundFileName);
     for(String fileName : perThread.flushState.flushedFiles) {
       cfsWriter.addFile(fileName);
     }
@@ -327,49 +347,53 @@ final class DocumentsWriter {
     cfsWriter.close();
   }
 
-  // nocommit
   void finishFlushedSegment(SegmentInfo newSegment, DocumentsWriterPerThread perThread) throws IOException {
-    synchronized(indexWriter) {
-      indexWriter.segmentInfos.add(newSegment);
-      indexWriter.checkpoint();
+    SegmentReader reader = indexWriter.readerPool.get(newSegment, false);
+    try {
+      applyDeletes(reader, newSegment.getMinSequenceID(), newSegment.getMaxSequenceID(), perThread.sequenceIDs);
+    } finally {
+      indexWriter.readerPool.release(reader);
+    }
     
-      SegmentReader reader = indexWriter.readerPool.get(newSegment, false);
-      boolean any = false;
+    if (indexWriter.useCompoundFile(newSegment)) {
+      String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
+      message("creating compound file " + compoundFileName);
+      // Now build compound file
+      boolean success = false;
       try {
-        any = applyDeletes(reader, newSegment.getMinSequenceID(), newSegment.getMaxSequenceID(), perThread.sequenceIDs);
+        createCompoundFile(compoundFileName, perThread);
+        success = true;
       } finally {
-        indexWriter.readerPool.release(reader);
-      }
-      if (any) {
-        indexWriter.checkpoint();
-      }
-  
-      if (indexWriter.mergePolicy.useCompoundFile(indexWriter.segmentInfos, newSegment)) {
-        // Now build compound file
-        boolean success = false;
-        try {
-          createCompoundFile(newSegment.name, perThread);
-          success = true;
-        } finally {
-          if (!success) {
-            if (infoStream != null) {
-              message("hit exception " +
-              		"reating compound file for newly flushed segment " + newSegment.name);
-            }
-            indexWriter.deleter.deleteFile(IndexFileNames.segmentFileName(newSegment.name, "", 
-                IndexFileNames.COMPOUND_FILE_EXTENSION));
+        if (!success) {
+          if (infoStream != null) {
+            message("hit exception " +
+            		"reating compound file for newly flushed segment " + newSegment.name);
           }
+          indexWriter.deleter.deleteFile(IndexFileNames.segmentFileName(newSegment.name, "", 
+              IndexFileNames.COMPOUND_FILE_EXTENSION));
+          for (String file : perThread.flushState.flushedFiles) {
+            indexWriter.deleter.deleteFile(file);
+          }
+
         }
-  
-        synchronized(indexWriter) {
-          newSegment.setUseCompoundFile(true);
-          indexWriter.checkpoint();
-          // In case the files we just merged into a CFS were
-          // not previously checkpointed:
-          indexWriter.deleter.deleteNewFiles(perThread.closedFiles());
-        }
+      }
+      
+      for (String file : perThread.flushState.flushedFiles) {
+        indexWriter.deleter.deleteFile(file);
+      }
+
+      newSegment.setUseCompoundFile(true);
+      
+      synchronized(openFiles) {
+        openFiles.remove(compoundFileName);
       }
     }
+    
+    synchronized(openFiles) {
+      openFiles.removeAll(perThread.flushState.flushedFiles);
+    }
+    
+    indexWriter.addNewSegment(newSegment);
   }
   
   // Returns true if an abort is in progress
@@ -400,6 +424,7 @@ final class DocumentsWriter {
     if (perThread.getNumDocsInRAM() == maxBufferedDocs) {
       flushSegment(perThread);
       assert perThread.getNumDocsInRAM() == 0;
+      
       return true;
     }
 
@@ -413,48 +438,57 @@ final class DocumentsWriter {
     }
 
     SegmentInfo newSegment = perThread.flush();
+    newSegment.dir = indexWriter.getDirectory();
     
-    if (newSegment != null) {
-      finishFlushedSegment(newSegment, perThread);
-      return true;
-    }
-    return false;
+    finishFlushedSegment(newSegment, perThread);
+    return true;
   }
 
   void abort() throws IOException {
-    threadPool.abort();
-    try {
-      try {
-        abortedFiles = openFiles();
-      } catch (Throwable t) {
-        abortedFiles = null;
+    threadPool.abort(new DocumentsWriterThreadPool.AbortTask() {
+      
+      @Override
+      void abort() throws IOException {
+        try {
+          abortedFiles = openFiles();
+        } catch (Throwable t) {
+          abortedFiles = null;
+        }
+    
+        deletesInRAM.clear();
+        // nocommit
+    //        deletesFlushed.clear();
+    
+        openFiles.clear();
+        deletesInRAM.clear();
       }
-  
-      deletesInRAM.clear();
-      // nocommit
-  //        deletesFlushed.clear();
-  
-      openFiles.clear();
-    } finally {
-      threadPool.finishAbort();
-    }
-
+    });
   }
 
-  final List<String> openFiles = new ArrayList<String>();
+  final Set<String> openFiles = new HashSet<String>();
   private Collection<String> abortedFiles; // List of files that were written before last abort()
-
   /*
    * Returns Collection of files in use by this instance,
    * including any flushed segments.
    */
   @SuppressWarnings("unchecked")
-  List<String> openFiles() {
+  private Collection<String> openFiles() {
     synchronized(openFiles) {
-      return (List<String>) ((ArrayList<String>) openFiles).clone();
+      return (Set<String>) ((HashSet<String>) openFiles).clone();
     }
   }
 
+  void addOpenFile(String file) {
+    synchronized(openFiles) {
+      openFiles.add(file);
+    }
+  }
+
+  void removeOpenFile(String file) {
+    synchronized(openFiles) {
+      openFiles.remove(file);
+    }
+  }
   
   Collection<String> abortedFiles() {
     return abortedFiles;
@@ -480,27 +514,29 @@ final class DocumentsWriter {
   // }
 
   boolean applyDeletes(SegmentInfos infos) throws IOException {
-    synchronized(indexWriter) {
-      if (!hasDeletes())
-        return false;
-  
-      final long t0 = System.currentTimeMillis();
-  
-      if (infoStream != null) {
-        message("apply " + deletesInRAM.getNumDeletes() + " buffered deletes on " +
-                +infos.size() + " segments.");
-      }
-  
-      final int infosEnd = infos.size();
-  
-      boolean any = false;
-      for (int i = 0; i < infosEnd; i++) {
-  
-        // Make sure we never attempt to apply deletes to
-        // segment in external dir
-        assert infos.info(i).dir == directory;
-  
-        SegmentInfo si = infos.info(i);
+    if (!hasDeletes())
+      return false;
+
+    final long t0 = System.currentTimeMillis();
+
+    if (infoStream != null) {
+      message("apply " + deletesInRAM.getNumDeletes() + " buffered deletes on " +
+              +infos.size() + " segments.");
+    }
+
+    final int infosEnd = infos.size();
+
+    boolean any = false;
+    for (int i = 0; i < infosEnd; i++) {
+
+      // Make sure we never attempt to apply deletes to
+      // segment in external dir
+      assert infos.info(i).dir == indexWriter.getDirectory();
+
+      SegmentInfo si = infos.info(i);
+      // we have to synchronize here, because we need a write lock on
+      // the segment in order to apply deletes
+      synchronized (indexWriter) {
         SegmentReader reader = indexWriter.readerPool.get(si, false);
         try {
           any |= applyDeletes(reader, si.getMinSequenceID(), si.getMaxSequenceID(), null);
@@ -508,13 +544,13 @@ final class DocumentsWriter {
           indexWriter.readerPool.release(reader);
         }
       }
-  
-      if (infoStream != null) {
-        message("apply deletes took " + (System.currentTimeMillis() - t0) + " msec");
-      }
-  
-      return any;
     }
+
+    if (infoStream != null) {
+      message("apply deletes took " + (System.currentTimeMillis() - t0) + " msec");
+    }
+
+    return any;
   }
 
   // Apply buffered delete terms, queries and docIDs to the
@@ -642,9 +678,6 @@ final class DocumentsWriter {
   }
 
   void message(String message) {
-    if (infoStream != null) {
-      indexWriter.message("DW: " + message);
-    }
+    indexWriter.message("DW: " + message);
   }
-
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Tue Jul 27 20:33:05 2010
@@ -1,15 +1,32 @@
 package org.apache.lucene.index;
 
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 import java.io.IOException;
 import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.codecs.Codec;
 import org.apache.lucene.search.Similarity;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.ArrayUtil;
 
 public class DocumentsWriterPerThread {
@@ -84,6 +101,7 @@ public class DocumentsWriterPerThread {
    *  currently buffered docs.  This resets our state,
    *  discarding any docs added since last flush. */
   void abort() throws IOException {
+    assert aborting;
     try {
       if (infoStream != null) {
         message("docWriter: now abort");
@@ -124,11 +142,11 @@ public class DocumentsWriterPerThread {
   SegmentWriteState flushState;
 
   long[] sequenceIDs = new long[8];
-  final List<String> closedFiles = new ArrayList<String>();
   
   long numBytesUsed;
   
   public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
+    parent.indexWriter.testPoint("DocumentsWriterPerThread.init start");
     this.directory = directory;
     this.parent = parent;
     this.writer = parent.indexWriter;
@@ -141,7 +159,6 @@ public class DocumentsWriterPerThread {
     if (consumer instanceof DocFieldProcessor) {
       docFieldProcessor = (DocFieldProcessor) consumer;
     }
-
   }
   
   void setAborting() {
@@ -288,33 +305,11 @@ public class DocumentsWriterPerThread {
     return segment;
   }
   
-  @SuppressWarnings("unchecked")
-  List<String> closedFiles() {
-    return (List<String>) ((ArrayList<String>) closedFiles).clone();
-  }
-
-  
-  void addOpenFile(String name) {
-    synchronized(parent.openFiles) {
-      assert !parent.openFiles.contains(name);
-      parent.openFiles.add(name);
-    }
-  }
-
-  void removeOpenFile(String name) {
-    synchronized(parent.openFiles) {
-      assert parent.openFiles.contains(name);
-      parent.openFiles.remove(name);
-    }
-    closedFiles.add(name);
-  }
-  
   void bytesUsed(long numBytes) {
     ramAllocator.bytesUsed(numBytes);
   }
   
   void message(String message) {
-    if (infoStream != null)
-      writer.message("DW: " + message);
+    writer.message("DW: " + message);
   }
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java Tue Jul 27 20:33:05 2010
@@ -1,5 +1,22 @@
 package org.apache.lucene.index;
 
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.locks.Condition;
@@ -30,6 +47,10 @@ abstract class DocumentsWriterThreadPool
     abstract T process(final Iterator<DocumentsWriterPerThread> threadsIterator) throws IOException;
   }
 
+  public static abstract class AbortTask {
+    abstract void abort() throws IOException;
+  }
+  
   protected abstract static class ThreadState {
     private DocumentsWriterPerThread perThread;
     private boolean isIdle = true;
@@ -95,11 +116,29 @@ abstract class DocumentsWriterThreadPool
     return true;
   }
   
-  void abort() throws IOException {
-    pauseAllThreads();
-    aborting = true;
-    for (ThreadState state : allThreadStates) {
-      state.perThread.abort();
+  void abort(AbortTask task) throws IOException {
+    lock.lock();
+    try {
+      if (!aborting) {
+        aborting = true;
+        pauseAllThreads();
+        for (ThreadState state : allThreadStates) {
+          state.perThread.aborting = true;
+        }
+
+        try {
+          for (ThreadState state : allThreadStates) {
+            state.perThread.abort();
+          }
+          
+          task.abort();
+        } finally {
+          aborting = false;
+          resumeAllThreads();
+        }
+      }
+    } finally {
+      lock.unlock();
     }
   }
   
@@ -108,7 +147,7 @@ abstract class DocumentsWriterThreadPool
     resumeAllThreads();
   }
 
-  public <T> T executeAllThreads(AllThreadsTask<T> task) throws IOException {
+  public <T> T executeAllThreads(DocumentsWriter documentsWriter, AllThreadsTask<T> task) throws IOException {
     T result = null;
     
     lock.lock();
@@ -120,19 +159,21 @@ abstract class DocumentsWriterThreadPool
       } catch (InterruptedException ie) {
         throw new ThreadInterruptedException(ie);
       }
-      
-      pauseAllThreads();
+
+      assert !globalLock;
       globalLock = true;
+
+      pauseAllThreads();
+      
     } finally {
       lock.unlock();
     }
 
+    final ThreadState[] localAllThreads = allThreadStates;
     
     // all threads are idle now
-    
+    boolean success = false;
     try {
-      final ThreadState[] localAllThreads = allThreadStates;
-      
       result = task.process(new Iterator<DocumentsWriterPerThread>() {
         int i = 0;
   
@@ -151,8 +192,18 @@ abstract class DocumentsWriterThreadPool
           throw new UnsupportedOperationException("remove() not supported.");
         }
       });
+      success = true;
       return result;
     } finally {
+      boolean abort = false;
+      if (!success) {
+        for (ThreadState state : localAllThreads) {
+          if (state.perThread.aborting) {
+            abort = true;
+          }
+        }
+      }
+      
       lock.lock();
       try {
         try {
@@ -168,6 +219,10 @@ abstract class DocumentsWriterThreadPool
         lock.unlock();
       }
       
+      if (!aborting && abort) {
+        documentsWriter.abort();
+      }
+      
     }
   }
 
@@ -182,13 +237,12 @@ abstract class DocumentsWriterThreadPool
     } finally {
       boolean abort = false;
       if (!success && state.perThread.aborting) {
-        state.perThread.aborting = false;
         abort = true;
       }
 
       returnDocumentsWriterPerThread(state, task.doClearThreadBindings());
       
-      if (abort) {
+      if (!aborting && abort) {
         documentsWriter.abort();
       }
     }
@@ -222,13 +276,15 @@ abstract class DocumentsWriterThreadPool
       ThreadState threadState = selectThreadState(Thread.currentThread(), documentsWriter, doc);
       
       try {
-        while (!threadState.isIdle || globalLock || aborting) {
+        while (!threadState.isIdle || globalLock || aborting || threadState.perThread.aborting) {
           threadStateAvailable.await();
         }
       } catch (InterruptedException ie) {
         throw new ThreadInterruptedException(ie);
       }
       
+      assert threadState.isIdle;
+      
       threadState.isIdle = false;
       threadState.start();
       

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java Tue Jul 27 20:33:05 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Set;
 
 import java.util.List;
 import java.util.Map;
@@ -422,7 +423,7 @@ final class IndexFileDeleter {
   public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
 
     if (infoStream != null) {
-      message("now checkpoint \"" + segmentInfos.getCurrentSegmentFileName() + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
+      message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
     }
 
     // Try again now to delete any previously un-deletable
@@ -442,18 +443,6 @@ final class IndexFileDeleter {
       // Decref files for commits that were deleted by the policy:
       deleteCommits();
     } else {
-
-      final List<String> docWriterFiles;
-      if (docWriter != null) {
-        docWriterFiles = docWriter.openFiles();
-        if (docWriterFiles != null)
-          // We must incRef these files before decRef'ing
-          // last files to make sure we don't accidentally
-          // delete them:
-          incRef(docWriterFiles);
-      } else
-        docWriterFiles = null;
-
       // DecRef old files from the last checkpoint, if any:
       int size = lastFiles.size();
       if (size > 0) {
@@ -465,8 +454,6 @@ final class IndexFileDeleter {
       // Save files so we can decr on next checkpoint/commit:
       lastFiles.add(segmentInfos.files(directory, false));
 
-      if (docWriterFiles != null)
-        lastFiles.add(docWriterFiles);
     }
   }
 

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexReaderPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexReaderPool.java?rev=979856&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexReaderPool.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexReaderPool.java Tue Jul 27 20:33:05 2010
@@ -0,0 +1,263 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.Directory;
+
+/** Holds shared SegmentReader instances. IndexWriter uses
+ *  SegmentReaders for 1) applying deletes, 2) doing
+ *  merges, 3) handing out a real-time reader.  This pool
+ *  reuses instances of the SegmentReaders in all these
+ *  places if it is in "near real-time mode" (getReader()
+ *  has been called on this instance). */
+public class IndexReaderPool {
+
+  private final Map<SegmentInfo,SegmentReader> readerMap = new HashMap<SegmentInfo,SegmentReader>();
+
+  private final Directory directory;
+  private final IndexWriterConfig config;
+  private final IndexWriter writer;
+  
+  public IndexReaderPool(IndexWriter writer, Directory directory, IndexWriterConfig config) {
+    this.directory = directory;
+    this.config = config;
+    this.writer = writer;
+  }
+  
+  /** Forcefully clear changes for the specified segments,
+   *  and remove from the pool.   This is called on successful merge. */
+  synchronized void clear(SegmentInfos infos) throws IOException {
+    if (infos == null) {
+      for (Map.Entry<SegmentInfo,SegmentReader> ent: readerMap.entrySet()) {
+        ent.getValue().hasChanges = false;
+      }
+    } else {
+      for (final SegmentInfo info: infos) {
+        if (readerMap.containsKey(info)) {
+          readerMap.get(info).hasChanges = false;
+        }
+      }     
+    }
+  }
+  
+  /**
+   * Release the segment reader (i.e. decRef it and close if there
+   * are no more references.
+   * @param sr
+   * @throws IOException
+   */
+  public synchronized void release(SegmentReader sr) throws IOException {
+    release(sr, false);
+  }
+  
+  /**
+   * Release the segment reader (i.e. decRef it and close if there
+   * are no more references.
+   * @param sr
+   * @throws IOException
+   */
+  public synchronized void release(SegmentReader sr, boolean drop) throws IOException {
+
+    final boolean pooled = readerMap.containsKey(sr.getSegmentInfo());
+
+    assert !pooled | readerMap.get(sr.getSegmentInfo()) == sr;
+
+    // Drop caller's ref; for an external reader (not
+    // pooled), this decRef will close it
+    sr.decRef();
+
+    if (pooled && (drop || (!writer.poolReaders && sr.getRefCount() == 1))) {
+
+      // We are the last ref to this reader; since we're
+      // not pooling readers, we release it:
+      readerMap.remove(sr.getSegmentInfo());
+
+      // nocommit
+      //assert !sr.hasChanges || Thread.holdsLock(IndexWriter.this);
+
+      // Drop our ref -- this will commit any pending
+      // changes to the dir
+      boolean success = false;
+      try {
+        sr.close();
+        success = true;
+      } finally {
+        if (!success && sr.hasChanges) {
+          // Abandon the changes & retry closing:
+          sr.hasChanges = false;
+          try {
+            sr.close();
+          } catch (Throwable ignore) {
+            // Keep throwing original exception
+          }
+        }
+      }
+    }
+  }
+  
+  /** Remove all our references to readers, and commits
+   *  any pending changes. */
+  synchronized void close() throws IOException {
+    Iterator<Map.Entry<SegmentInfo,SegmentReader>> iter = readerMap.entrySet().iterator();
+    while (iter.hasNext()) {
+      
+      Map.Entry<SegmentInfo,SegmentReader> ent = iter.next();
+
+      SegmentReader sr = ent.getValue();
+      if (sr.hasChanges) {
+        assert writer.infoIsLive(sr.getSegmentInfo());
+        sr.startCommit();
+        boolean success = false;
+        try {
+          sr.doCommit(null);
+          success = true;
+        } finally {
+          if (!success) {
+            sr.rollbackCommit();
+          }
+        }
+      }
+
+      iter.remove();
+
+      // NOTE: it is allowed that this decRef does not
+      // actually close the SR; this can happen when a
+      // near real-time reader is kept open after the
+      // IndexWriter instance is closed
+      sr.decRef();
+    }
+  }
+  
+  /**
+   * Commit all segment reader in the pool.
+   * @throws IOException
+   */
+  synchronized void commit() throws IOException {
+    for (Map.Entry<SegmentInfo,SegmentReader> ent : readerMap.entrySet()) {
+
+      SegmentReader sr = ent.getValue();
+      if (sr.hasChanges) {
+        assert writer.infoIsLive(sr.getSegmentInfo());
+        sr.startCommit();
+        boolean success = false;
+        try {
+          sr.doCommit(null);
+          success = true;
+        } finally {
+          if (!success) {
+            sr.rollbackCommit();
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Returns a ref to a clone.  NOTE: this clone is not
+   * enrolled in the pool, so you should simply close()
+   * it when you're done (ie, do not call release()).
+   */
+  public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, int termInfosIndexDivisor) throws IOException {
+    SegmentReader sr = get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, termInfosIndexDivisor);
+    try {
+      return (SegmentReader) sr.clone(true);
+    } finally {
+      sr.decRef();
+    }
+  }
+ 
+  /**
+   * Obtain a SegmentReader from the readerPool.  The reader
+   * must be returned by calling {@link #release(SegmentReader)}
+   * @see #release(SegmentReader)
+   * @param info
+   * @param doOpenStores
+   * @throws IOException
+   */
+  public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores) throws IOException {
+    return get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, config.getReaderTermsIndexDivisor());
+  }
+
+  /**
+   * Obtain a SegmentReader from the readerPool.  The reader
+   * must be returned by calling {@link #release(SegmentReader)}
+   * 
+   * @see #release(SegmentReader)
+   * @param info
+   * @param doOpenStores
+   * @param readBufferSize
+   * @param termsIndexDivisor
+   * @throws IOException
+   */
+  public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, int readBufferSize, int termsIndexDivisor) throws IOException {
+
+    if (writer.poolReaders) {
+      readBufferSize = BufferedIndexInput.BUFFER_SIZE;
+    }
+
+    SegmentReader sr = readerMap.get(info);
+    if (sr == null) {
+      // TODO: we may want to avoid doing this while
+      // synchronized
+      // Returns a ref, which we xfer to readerMap:
+      sr = SegmentReader.get(false, info.dir, info, readBufferSize, doOpenStores, termsIndexDivisor, config.getCodecProvider());
+
+      if (info.dir == directory) {
+        // Only pool if reader is not external
+        readerMap.put(info, sr);
+      }
+    } else {
+      if (doOpenStores) {
+        sr.openDocStores();
+      }
+      if (termsIndexDivisor != -1) {
+        // If this reader was originally opened because we
+        // needed to merge it, we didn't load the terms
+        // index.  But now, if the caller wants the terms
+        // index (eg because it's doing deletes, or an NRT
+        // reader is being opened) we ask the reader to
+        // load its terms index.
+        sr.loadTermsIndex(termsIndexDivisor);
+      }
+    }
+
+    // Return a ref to our caller
+    if (info.dir == directory) {
+      // Only incRef if we pooled (reader is not external)
+      sr.incRef();
+    }
+    return sr;
+  }
+
+  // Returns a ref
+  public synchronized SegmentReader getIfExists(SegmentInfo info) throws IOException {
+    SegmentReader sr = readerMap.get(info);
+    if (sr != null) {
+      sr.incRef();
+    }
+    return sr;
+  }
+}
+
+

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java Tue Jul 27 20:33:05 2010
@@ -271,8 +271,7 @@ public class IndexWriter implements Clos
   volatile SegmentInfos pendingCommit;            // set when a commit is pending (after prepareCommit() & before commit())
   volatile long pendingCommitChangeCount;
 
-  // nocommit - private
-  SegmentInfos segmentInfos = new SegmentInfos();       // the segments
+  private final SegmentInfos segmentInfos = new SegmentInfos();       // the segments
 
   private DocumentsWriter docWriter;
   //nocommit - private
@@ -304,7 +303,7 @@ public class IndexWriter implements Clos
   private int flushCount;
   private int flushDeletesCount;
 
-  final ReaderPool readerPool = new ReaderPool();
+  final IndexReaderPool readerPool;
   
   // This is a "write once" variable (like the organic dye
   // on a DVD-R that may or may not be heated by a laser and
@@ -315,7 +314,7 @@ public class IndexWriter implements Clos
   // reuse SegmentReader instances internally for applying
   // deletes, doing merges, and reopening near real-time
   // readers.
-  private volatile boolean poolReaders;
+  volatile boolean poolReaders;
 
   // The instance that was passed to the constructor. It is saved only in order
   // to allow users to query an IndexWriter settings.
@@ -427,246 +426,21 @@ public class IndexWriter implements Clos
       return r;
     }
   }
-
-  /** Holds shared SegmentReader instances. IndexWriter uses
-   *  SegmentReaders for 1) applying deletes, 2) doing
-   *  merges, 3) handing out a real-time reader.  This pool
-   *  reuses instances of the SegmentReaders in all these
-   *  places if it is in "near real-time mode" (getReader()
-   *  has been called on this instance). */
-
-  class ReaderPool {
-
-    private final Map<SegmentInfo,SegmentReader> readerMap = new HashMap<SegmentInfo,SegmentReader>();
-
-    /** Forcefully clear changes for the specified segments,
-     *  and remove from the pool.   This is called on successful merge. */
-    synchronized void clear(SegmentInfos infos) throws IOException {
-      if (infos == null) {
-        for (Map.Entry<SegmentInfo,SegmentReader> ent: readerMap.entrySet()) {
-          ent.getValue().hasChanges = false;
-        }
-      } else {
-        for (final SegmentInfo info: infos) {
-          if (readerMap.containsKey(info)) {
-            readerMap.get(info).hasChanges = false;
-          }
-        }     
-      }
-    }
-    
-    // used only by asserts
-    public synchronized boolean infoIsLive(SegmentInfo info) {
-      int idx = segmentInfos.indexOf(info);
-      assert idx != -1;
-      assert segmentInfos.get(idx) == info;
-      return true;
-    }
-
-    public synchronized SegmentInfo mapToLive(SegmentInfo info) {
-      int idx = segmentInfos.indexOf(info);
-      if (idx != -1) {
-        info = segmentInfos.get(idx);
-      }
-      return info;
-    }
-    
-    /**
-     * Release the segment reader (i.e. decRef it and close if there
-     * are no more references.
-     * @param sr
-     * @throws IOException
-     */
-    public synchronized void release(SegmentReader sr) throws IOException {
-      release(sr, false);
-    }
-    
-    /**
-     * Release the segment reader (i.e. decRef it and close if there
-     * are no more references.
-     * @param sr
-     * @throws IOException
-     */
-    public synchronized void release(SegmentReader sr, boolean drop) throws IOException {
-
-      final boolean pooled = readerMap.containsKey(sr.getSegmentInfo());
-
-      assert !pooled | readerMap.get(sr.getSegmentInfo()) == sr;
-
-      // Drop caller's ref; for an external reader (not
-      // pooled), this decRef will close it
-      sr.decRef();
-
-      if (pooled && (drop || (!poolReaders && sr.getRefCount() == 1))) {
-
-        // We are the last ref to this reader; since we're
-        // not pooling readers, we release it:
-        readerMap.remove(sr.getSegmentInfo());
-
-        assert !sr.hasChanges || Thread.holdsLock(IndexWriter.this);
-
-        // Drop our ref -- this will commit any pending
-        // changes to the dir
-        boolean success = false;
-        try {
-          sr.close();
-          success = true;
-        } finally {
-          if (!success && sr.hasChanges) {
-            // Abandon the changes & retry closing:
-            sr.hasChanges = false;
-            try {
-              sr.close();
-            } catch (Throwable ignore) {
-              // Keep throwing original exception
-            }
-          }
-        }
-      }
-    }
-    
-    /** Remove all our references to readers, and commits
-     *  any pending changes. */
-    synchronized void close() throws IOException {
-      Iterator<Map.Entry<SegmentInfo,SegmentReader>> iter = readerMap.entrySet().iterator();
-      while (iter.hasNext()) {
-        
-        Map.Entry<SegmentInfo,SegmentReader> ent = iter.next();
-
-        SegmentReader sr = ent.getValue();
-        if (sr.hasChanges) {
-          assert infoIsLive(sr.getSegmentInfo());
-          sr.startCommit();
-          boolean success = false;
-          try {
-            sr.doCommit(null);
-            success = true;
-          } finally {
-            if (!success) {
-              sr.rollbackCommit();
-            }
-          }
-        }
-
-        iter.remove();
-
-        // NOTE: it is allowed that this decRef does not
-        // actually close the SR; this can happen when a
-        // near real-time reader is kept open after the
-        // IndexWriter instance is closed
-        sr.decRef();
-      }
-    }
-    
-    /**
-     * Commit all segment reader in the pool.
-     * @throws IOException
-     */
-    synchronized void commit() throws IOException {
-      for (Map.Entry<SegmentInfo,SegmentReader> ent : readerMap.entrySet()) {
-
-        SegmentReader sr = ent.getValue();
-        if (sr.hasChanges) {
-          assert infoIsLive(sr.getSegmentInfo());
-          sr.startCommit();
-          boolean success = false;
-          try {
-            sr.doCommit(null);
-            success = true;
-          } finally {
-            if (!success) {
-              sr.rollbackCommit();
-            }
-          }
-        }
-      }
-    }
-    
-    /**
-     * Returns a ref to a clone.  NOTE: this clone is not
-     * enrolled in the pool, so you should simply close()
-     * it when you're done (ie, do not call release()).
-     */
-    public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, int termInfosIndexDivisor) throws IOException {
-      SegmentReader sr = get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, termInfosIndexDivisor);
-      try {
-        return (SegmentReader) sr.clone(true);
-      } finally {
-        sr.decRef();
-      }
-    }
-   
-    /**
-     * Obtain a SegmentReader from the readerPool.  The reader
-     * must be returned by calling {@link #release(SegmentReader)}
-     * @see #release(SegmentReader)
-     * @param info
-     * @param doOpenStores
-     * @throws IOException
-     */
-    public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores) throws IOException {
-      return get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, config.getReaderTermsIndexDivisor());
-    }
-
-    /**
-     * Obtain a SegmentReader from the readerPool.  The reader
-     * must be returned by calling {@link #release(SegmentReader)}
-     * 
-     * @see #release(SegmentReader)
-     * @param info
-     * @param doOpenStores
-     * @param readBufferSize
-     * @param termsIndexDivisor
-     * @throws IOException
-     */
-    public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, int readBufferSize, int termsIndexDivisor) throws IOException {
-
-      if (poolReaders) {
-        readBufferSize = BufferedIndexInput.BUFFER_SIZE;
-      }
-
-      SegmentReader sr = readerMap.get(info);
-      if (sr == null) {
-        // TODO: we may want to avoid doing this while
-        // synchronized
-        // Returns a ref, which we xfer to readerMap:
-        sr = SegmentReader.get(false, info.dir, info, readBufferSize, doOpenStores, termsIndexDivisor, codecs);
-
-        if (info.dir == directory) {
-          // Only pool if reader is not external
-          readerMap.put(info, sr);
-        }
-      } else {
-        if (doOpenStores) {
-          sr.openDocStores();
-        }
-        if (termsIndexDivisor != -1) {
-          // If this reader was originally opened because we
-          // needed to merge it, we didn't load the terms
-          // index.  But now, if the caller wants the terms
-          // index (eg because it's doing deletes, or an NRT
-          // reader is being opened) we ask the reader to
-          // load its terms index.
-          sr.loadTermsIndex(termsIndexDivisor);
-        }
-      }
-
-      // Return a ref to our caller
-      if (info.dir == directory) {
-        // Only incRef if we pooled (reader is not external)
-        sr.incRef();
-      }
-      return sr;
-    }
-
-    // Returns a ref
-    public synchronized SegmentReader getIfExists(SegmentInfo info) throws IOException {
-      SegmentReader sr = readerMap.get(info);
-      if (sr != null) {
-        sr.incRef();
-      }
-      return sr;
+  
+  // used only by asserts
+  public synchronized boolean infoIsLive(SegmentInfo info) {
+    int idx = segmentInfos.indexOf(info);
+    assert idx != -1;
+    assert segmentInfos.get(idx) == info;
+    return true;
+  }
+  
+  public synchronized SegmentInfo mapToLive(SegmentInfo info) {
+    int idx = segmentInfos.indexOf(info);
+    if (idx != -1) {
+      info = segmentInfos.get(idx);
     }
+    return info;
   }
   
   /**
@@ -934,6 +708,8 @@ public class IndexWriter implements Clos
     
     poolReaders = conf.getReaderPooling();
 
+    this.readerPool = new IndexReaderPool(this, directory, config);
+    
     OpenMode mode = conf.getOpenMode();
     boolean create;
     if (mode == OpenMode.CREATE) {
@@ -1784,7 +1560,7 @@ public class IndexWriter implements Clos
           if (infoStream != null) {
             message("hit exception updating document");
           }
-
+          
           synchronized (this) {
             // If docWriter has some aborted files that were
             // never incref'd, then we clean them up here
@@ -2419,6 +2195,15 @@ public class IndexWriter implements Clos
     deleter.checkpoint(segmentInfos, false);
   }
 
+  synchronized void addNewSegment(SegmentInfo newSegment) throws IOException {
+    segmentInfos.add(newSegment);
+    checkpoint();
+  }
+  
+  boolean useCompoundFile(SegmentInfo segmentInfo) {
+    return mergePolicy.useCompoundFile(segmentInfos, segmentInfo);
+  }
+  
   private synchronized void resetMergeExceptions() {
     mergeExceptions = new ArrayList<MergePolicy.OneMerge>();
     mergeGen++;
@@ -2793,7 +2578,7 @@ public class IndexWriter implements Clos
     if (pendingCommit != null) {
       try {
         if (infoStream != null)
-    	  message("commit: pendingCommit != null");
+        message("commit: pendingCommit != null");
         pendingCommit.finishCommit(directory);
         if (infoStream != null)
           message("commit: wrote segments file \"" + pendingCommit.getCurrentSegmentFileName() + "\"");
@@ -2828,25 +2613,37 @@ public class IndexWriter implements Clos
   protected final void flush(boolean triggerMerge, boolean flushDeletes) throws CorruptIndexException, IOException {
     // We can be called during close, when closing==true, so we must pass false to ensureOpen:
     ensureOpen(false);
-    if (doFlush(flushDeletes) && triggerMerge)
+    
+    doBeforeFlush();
+    
+    if (flushDeletes) {
+      if (applyDeletes()) {
+        checkpoint();
+      }
+    }
+    boolean maybeMerge = false;
+    boolean success = false;
+    try {
+      maybeMerge = docWriter.flushAllThreads(flushDeletes) && triggerMerge;
+      success = true;
+    } finally {
+      if (!success) {
+        synchronized (this) {
+          // If docWriter has some aborted files that were
+          // never incref'd, then we clean them up here
+          final Collection<String> files = docWriter.abortedFiles();
+          if (files != null) {
+            deleter.deleteNewFiles(files);
+          }
+        }
+      }
+    }
+    
+    doAfterFlush();
+    
+    if (maybeMerge) {
       maybeMerge();
-  }
-
-  // TODO: this method should not have to be entirely
-  // synchronized, ie, merges should be allowed to commit
-  // even while a flush is happening
-  private synchronized final boolean doFlush(boolean flushDeletes) throws CorruptIndexException, IOException {
-    return docWriter.flushAllThreads(flushDeletes);
-    // nocommit
-//    try {
-//      try {
-//        return doFlushInternal(flushDocStores, flushDeletes);
-//      } finally {
-//        docWriter.balanceRAM();
-//      }
-//    } finally {
-//      docWriter.clearFlushPending();
-//    }
+    }
   }
 
   /** Expert:  Return the total size of all index files currently cached in memory.

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java Tue Jul 27 20:33:05 2010
@@ -79,9 +79,6 @@ final class StoredFieldsWriter {
       state.flushedFiles.add(fieldsName);
       state.flushedFiles.add(fieldsIdxName);
 
-      docWriter.removeOpenFile(fieldsName);
-      docWriter.removeOpenFile(fieldsIdxName);
-
       if (4+((long) state.numDocs)*8 != state.directory.fileLength(fieldsIdxName)) {
         throw new RuntimeException("after flush: fdx size mismatch: " + state.numDocs + " docs vs " + state.directory.fileLength(fieldsIdxName) + " length in bytes of " + fieldsIdxName + " file exists?=" + state.directory.fileExists(fieldsIdxName));
       }
@@ -96,8 +93,6 @@ final class StoredFieldsWriter {
         fieldsWriter = new FieldsWriter(docWriter.directory,
                                         segment,
                                         fieldInfos);
-        docWriter.addOpenFile(IndexFileNames.segmentFileName(segment, "", IndexFileNames.FIELDS_EXTENSION));
-        docWriter.addOpenFile(IndexFileNames.segmentFileName(segment, "", IndexFileNames.FIELDS_INDEX_EXTENSION));
         lastDocID = 0;
       }
     }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java Tue Jul 27 20:33:05 2010
@@ -75,10 +75,6 @@ final class TermVectorsTermsWriter exten
       state.flushedFiles.add(fldName);
       state.flushedFiles.add(docName);
 
-      docWriter.removeOpenFile(idxName);
-      docWriter.removeOpenFile(fldName);
-      docWriter.removeOpenFile(docName);
-
       lastDocID = 0;
 
     }
@@ -105,7 +101,7 @@ final class TermVectorsTermsWriter exten
     }
   }
 
-  void initTermVectorsWriter() throws IOException {        
+  private final void initTermVectorsWriter() throws IOException {        
     if (tvx == null) {
       
       final String segment = docWriter.getSegment();
@@ -128,10 +124,6 @@ final class TermVectorsTermsWriter exten
       tvd.writeInt(TermVectorsReader.FORMAT_CURRENT);
       tvf.writeInt(TermVectorsReader.FORMAT_CURRENT);
 
-      docWriter.addOpenFile(idxName);
-      docWriter.addOpenFile(fldName);
-      docWriter.addOpenFile(docName);
-
       lastDocID = 0;
     }
   }
@@ -146,7 +138,8 @@ final class TermVectorsTermsWriter exten
     fill(docState.docID);
     
     // Append term vectors to the real outputs:
-    tvx.writeLong(tvd.getFilePointer());
+    long pointer = tvd.getFilePointer();
+    tvx.writeLong(pointer);
     tvx.writeLong(tvf.getFilePointer());
     tvd.writeVInt(numVectorFields);
     if (numVectorFields > 0) {
@@ -198,7 +191,7 @@ final class TermVectorsTermsWriter exten
     }
     lastDocID = 0;
     
-
+    reset();
   }
 
   int numVectorFields;

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java Tue Jul 27 20:33:05 2010
@@ -82,9 +82,12 @@ final class TermsHash extends InvertedDo
   @Override
   public void abort() {
     reset();
-    consumer.abort();
-    if (nextTermsHash != null) {
-      nextTermsHash.abort();
+    try {
+      consumer.abort();
+    } finally {
+      if (nextTermsHash != null) {
+        nextTermsHash.abort();
+      }
     }
   }
   

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/store/FilterDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/store/FilterDirectory.java?rev=979856&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/store/FilterDirectory.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/store/FilterDirectory.java Tue Jul 27 20:33:05 2010
@@ -0,0 +1,111 @@
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collection;
+
+public abstract class FilterDirectory extends Directory {
+  private final Directory delegate;
+  
+  public FilterDirectory(Directory delegate) {
+    this.delegate = delegate;
+  }
+  
+  @Override
+  public String[] listAll() throws IOException {
+    return delegate.listAll();
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    return delegate.fileExists(name);
+  }
+
+  @Override
+  public long fileModified(String name) throws IOException {
+    return delegate.fileModified(name);
+  }
+  
+  @Override
+  public void touchFile(String name) throws IOException {
+    delegate.touchFile(name);
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    delegate.deleteFile(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    return delegate.fileLength(name);
+  }
+
+  @Override
+  public IndexOutput createOutput(String name) throws IOException {
+    return delegate.createOutput(name);
+  }
+
+  @Override
+  public IndexInput openInput(String name) throws IOException {
+    return delegate.openInput(name);
+  }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
+  
+  @Deprecated @Override
+  public void sync(String name) throws IOException { // TODO 4.0 kill me
+    delegate.sync(name);
+  }
+  
+  public void sync(Collection<String> names) throws IOException { // TODO 4.0 make me abstract
+    delegate.sync(names);
+  }
+
+  public IndexInput openInput(String name, int bufferSize) throws IOException {
+    return delegate.openInput(name, bufferSize);
+  }
+  
+  public Lock makeLock(String name) {
+    return delegate.makeLock(name);
+  }
+  
+  public void clearLock(String name) throws IOException {
+    delegate.clearLock(name);
+  }
+  
+  public void setLockFactory(LockFactory lockFactory) {
+    delegate.setLockFactory(lockFactory);
+  }
+  
+  public LockFactory getLockFactory() {
+    return delegate.getLockFactory();
+  }
+  
+  public String getLockID() {
+    return delegate.getLockID();
+  }
+  
+  public void copy(Directory to, String src, String dest) throws IOException {
+    delegate.copy(to, src, dest);
+  }
+}

Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java Tue Jul 27 20:33:05 2010
@@ -48,7 +48,7 @@ public class TestConcurrentMergeSchedule
       if (doFail && Thread.currentThread().getName().equals("main")) {
         StackTraceElement[] trace = new Exception().getStackTrace();
         for (int i = 0; i < trace.length; i++) {
-          if ("doFlush".equals(trace[i].getMethodName())) {
+          if ("flush".equals(trace[i].getMethodName())) {
             hitExc = true;
             throw new IOException("now failing during flush");
           }

Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java Tue Jul 27 20:33:05 2010
@@ -2351,7 +2351,7 @@ public class TestIndexWriter extends Luc
         StackTraceElement[] trace = new Exception().getStackTrace();
         for (int i = 0; i < trace.length; i++) {
           if ("abort".equals(trace[i].getMethodName()) ||
-              "flushDocument".equals(trace[i].getMethodName())) {
+              "finishDocument".equals(trace[i].getMethodName())) {
             if (onlyOnce)
               doFail = false;
             //System.out.println(Thread.currentThread().getName() + ": now fail");
@@ -2419,7 +2419,7 @@ public class TestIndexWriter extends Luc
 
       for(int i=0;i<NUM_THREADS;i++) {
         threads[i].join();
-        assertTrue("hit unexpected Throwable", threads[i].error == null);
+        assertTrue("hit unexpected Throwable ", threads[i].error == null);
       }
 
       boolean success = false;
@@ -2481,7 +2481,8 @@ public class TestIndexWriter extends Luc
       if (doFail) {
         StackTraceElement[] trace = new Exception().getStackTrace();
         for (int i = 0; i < trace.length; i++) {
-          if ("closeDocStore".equals(trace[i].getMethodName())) {
+          if ("finishDocument".equals(trace[i].getMethodName())
+              && "org.apache.lucene.index.DocFieldProcessor".equals(trace[i].getClassName())) {
             if (onlyOnce)
               doFail = false;
             throw new IOException("now failing on purpose");
@@ -3007,7 +3008,7 @@ public class TestIndexWriter extends Luc
 
     @Override
     boolean testPoint(String name) {
-      if (doFail && name.equals("DocumentsWriter.ThreadState.init start"))
+      if (doFail && name.equals("DocumentsWriterPerThread.init start"))
         throw new RuntimeException("intentionally failing");
       return true;
     }
@@ -3020,7 +3021,7 @@ public class TestIndexWriter extends Luc
     Document doc = new Document();
     doc.add(new Field("field", "a field", Field.Store.YES,
                       Field.Index.ANALYZED));
-    w.addDocument(doc);
+
     w.doFail = true;
     try {
       w.addDocument(doc);
@@ -4945,7 +4946,7 @@ public class TestIndexWriter extends Luc
     doc.add(new Field("c", "val", Store.YES, Index.ANALYZED, TermVector.WITH_POSITIONS_OFFSETS));
     writer.addDocument(doc);
     // The second document should cause a flush.
-    assertTrue("flush should have occurred and files created", dir.listAll().length > 5);
+    assertTrue("flush should have occurred and files created", dir.listAll().length > 0);
    
     // After rollback, IW should remove all files
     writer.rollback();

Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java Tue Jul 27 20:33:05 2010
@@ -189,8 +189,9 @@ public class TestIndexWriterExceptions e
       threads[i].join();
 
     for(int i=0;i<NUM_THREADS;i++)
-      if (threads[i].failure != null)
+      if (threads[i].failure != null) {
         fail("thread " + threads[i].getName() + ": hit unexpected failure");
+      }
 
     writer.commit();
 

Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java Tue Jul 27 20:33:05 2010
@@ -30,8 +30,7 @@ public class TestNRTReaderWithThreads ex
   Random random = new Random();
   HeavyAtomicInt seq = new HeavyAtomicInt(1);
 
-  // nocommit
-  public void _testIndexing() throws Exception {
+  public void testIndexing() throws Exception {
     Directory mainDir = new MockRAMDirectory();
     IndexWriter writer = new IndexWriter(mainDir, new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()).setMaxBufferedDocs(10));
     ((LogMergePolicy) writer.getConfig().getMergePolicy()).setMergeFactor(2);

Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java Tue Jul 27 20:33:05 2010
@@ -136,8 +136,7 @@ public class TestThreadedOptimize extend
     Run above stress test against RAMDirectory and then
     FSDirectory.
   */
-  // nocommit
-  public void _testThreadedOptimize() throws Exception {
+  public void testThreadedOptimize() throws Exception {
     Directory directory = new MockRAMDirectory();
     runTest(directory, new SerialMergeScheduler());
     runTest(directory, new ConcurrentMergeScheduler());