You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by bu...@apache.org on 2011/01/17 10:23:43 UTC

svn commit: r1059822 - in /lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index: DocumentsWriter.java DocumentsWriterPerThread.java FieldInfos.java IndexWriter.java SegmentDeletes.java

Author: buschmi
Date: Mon Jan 17 09:23:42 2011
New Revision: 1059822

URL: http://svn.apache.org/viewvc?rev=1059822&view=rev
Log:
LUCENE-2324: fix more junits in rt branch

Modified:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfos.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/SegmentDeletes.java

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1059822&r1=1059821&r2=1059822&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Jan 17 09:23:42 2011
@@ -105,8 +105,6 @@ final class DocumentsWriter {
   final AtomicLong bytesUsed = new AtomicLong(0);
   Directory directory;
 
-  int numDocsInStore;                     // # docs written to doc stores
-
   boolean bufferIsFull;                   // True when it's time to write segment
   private volatile boolean closed;
 
@@ -172,7 +170,7 @@ final class DocumentsWriter {
       }
     }
 
-    return true;
+    return false;
   }
 
   boolean deleteQuery(final Query query) throws IOException {
@@ -219,7 +217,7 @@ final class DocumentsWriter {
       if (state != exclude) {
         state.lock();
         try {
-          state.perThread.deleteTerm(term);
+          state.perThread.deleteTerms(term);
         } finally {
           state.unlock();
         }
@@ -348,18 +346,53 @@ final class DocumentsWriter {
   }
 
   synchronized boolean anyChanges() {
-    // nocommit
-    return numDocsInRAM.get() != 0;
-    //return numDocsInRAM.get() != 0 || pendingDeletes.any();
+    return numDocsInRAM.get() != 0 || anyDeletions();
+  }
+
+  public int getBufferedDeleteTermsSize() {
+    int size = 0;
+    Iterator<ThreadState> it = perThreadPool.getActivePerThreadsIterator();
+    while (it.hasNext()) {
+      DocumentsWriterPerThread dwpt = it.next().perThread;
+      size += dwpt.pendingDeletes.terms.size();
+    }
+    size += pendingDeletes.terms.size();
+    return size;
   }
 
-  // for testing
-  public synchronized SegmentDeletes getPendingDeletes() {
-    return pendingDeletes;
+  //for testing
+  public int getNumBufferedDeleteTerms() {
+    int numDeletes = 0;
+    Iterator<ThreadState> it = perThreadPool.getActivePerThreadsIterator();
+    while (it.hasNext()) {
+      DocumentsWriterPerThread dwpt = it.next().perThread;
+      numDeletes += dwpt.pendingDeletes.numTermDeletes.get();
+    }
+    numDeletes += pendingDeletes.numTermDeletes.get();
+    return numDeletes;
   }
 
+  // TODO: can we improve performance of this method by keeping track
+  // here in DW of whether any DWPT has deletions?
   public synchronized boolean anyDeletions() {
-    return pendingDeletes.any();
+    if (pendingDeletes.any()) {
+      return true;
+    }
+
+    Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+    while (threadsIterator.hasNext()) {
+      ThreadState state = threadsIterator.next();
+      state.lock();
+      try {
+        if (state.perThread.pendingDeletes.any()) {
+          return true;
+        }
+      } finally {
+        state.unlock();
+      }
+    }
+
+    return false;
   }
 
   synchronized void close() {
@@ -372,31 +405,34 @@ final class DocumentsWriter {
     ensureOpen();
 
     SegmentInfo newSegment = null;
+    SegmentDeletes segmentDeletes = null;
 
     ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc);
     try {
       DocumentsWriterPerThread dwpt = perThread.perThread;
       long perThreadRAMUsedBeforeAdd = dwpt.bytesUsed();
-      dwpt.addDocument(doc, analyzer);
-
-      if (delTerm != null) {
-        dwpt.deleteTerm(delTerm);
-      }
-      dwpt.commitDocument();
+      dwpt.updateDocument(doc, analyzer, delTerm);
       numDocsInRAM.incrementAndGet();
 
       newSegment = finishAddDocument(dwpt, perThreadRAMUsedBeforeAdd);
-      if (newSegment != null && dwpt.pendingDeletes.any()) {
-        bufferedDeletes.pushDeletes(dwpt.pendingDeletes, newSegment);
-        dwpt.pendingDeletes = new SegmentDeletes();
+      if (newSegment != null) {
+        fieldInfos.update(dwpt.getFieldInfos());
+        if (dwpt.pendingDeletes.any()) {
+          segmentDeletes = dwpt.pendingDeletes;
+          dwpt.pendingDeletes = new SegmentDeletes();
+        }
       }
     } finally {
       perThread.unlock();
     }
 
+    if (segmentDeletes != null) {
+      pushDeletes(newSegment, segmentDeletes);
+    }
+
     if (newSegment != null) {
       perThreadPool.clearThreadBindings(perThread);
-      finishFlushedSegment(newSegment);
+      indexWriter.addFlushedSegment(newSegment);
       return true;
     }
 
@@ -413,14 +449,8 @@ final class DocumentsWriter {
       long perThreadRAMUsedBeforeAdd) throws IOException {
     SegmentInfo newSegment = null;
 
-    int numDocsPerThread = perThread.getNumDocsInRAM();
     if (perThread.getNumDocsInRAM() == maxBufferedDocs) {
       newSegment = perThread.flush();
-
-      int oldValue = numDocsInRAM.get();
-      while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numDocsPerThread)) {
-        oldValue = numDocsInRAM.get();
-      }
     }
 
     long deltaRAM = perThread.bytesUsed() - perThreadRAMUsedBeforeAdd;
@@ -432,11 +462,20 @@ final class DocumentsWriter {
     return newSegment;
   }
 
-  private final void pushToLastSegment(SegmentDeletes segmentDeletes) {
+  final void substractFlushedNumDocs(int numFlushed) {
+    int oldValue = numDocsInRAM.get();
+    while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
+      oldValue = numDocsInRAM.get();
+    }
+  }
+
+  private final void pushDeletes(SegmentInfo segmentInfo, SegmentDeletes segmentDeletes) {
     synchronized(indexWriter) {
       // Lock order: DW -> BD
       if (segmentDeletes.any()) {
-        if (indexWriter.segmentInfos.size() > 0) {
+        if (segmentInfo != null) {
+          bufferedDeletes.pushDeletes(segmentDeletes, segmentInfo);
+        } else if (indexWriter.segmentInfos.size() > 0) {
           if (infoStream != null) {
             message("flush: push buffered deletes to previously flushed segment " + indexWriter.segmentInfos.lastElement());
           }
@@ -457,7 +496,10 @@ final class DocumentsWriter {
     throws IOException {
 
     if (flushDeletes) {
-      pushToLastSegment(pendingDeletes);
+      synchronized (this) {
+        pushDeletes(null, pendingDeletes);
+        pendingDeletes = new SegmentDeletes();
+      }
     }
 
     Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
@@ -465,6 +507,7 @@ final class DocumentsWriter {
 
     while (threadsIterator.hasNext()) {
       SegmentInfo newSegment = null;
+      SegmentDeletes segmentDeletes = null;
 
       ThreadState perThread = threadsIterator.next();
       perThread.lock();
@@ -484,75 +527,37 @@ final class DocumentsWriter {
           newSegment = dwpt.flush();
 
           if (newSegment != null) {
+            fieldInfos.update(dwpt.getFieldInfos());
             anythingFlushed = true;
             perThreadPool.clearThreadBindings(perThread);
             if (dwpt.pendingDeletes.any()) {
-              bufferedDeletes.pushDeletes(dwpt.pendingDeletes, newSegment);
+              segmentDeletes = dwpt.pendingDeletes;
               dwpt.pendingDeletes = new SegmentDeletes();
             }
           }
-        }
-        else if (flushDeletes && dwpt.pendingDeletes.any()) {
-          pushToLastSegment(dwpt.pendingDeletes);
+        } else if (flushDeletes && dwpt.pendingDeletes.any()) {
+          segmentDeletes = dwpt.pendingDeletes;
+          dwpt.pendingDeletes = new SegmentDeletes();
         }
       } finally {
         perThread.unlock();
       }
 
+      if (segmentDeletes != null) {
+          pushDeletes(newSegment, segmentDeletes);
+      }
+
+
       if (newSegment != null) {
         // important do unlock the perThread before finishFlushedSegment
         // is called to prevent deadlock on IndexWriter mutex
-        finishFlushedSegment(newSegment);
+        indexWriter.addFlushedSegment(newSegment);
       }
     }
 
-    numDocsInRAM.set(0);
     return anythingFlushed;
   }
 
-  /** Build compound file for the segment we just flushed */
-  void createCompoundFile(String compoundFileName, Collection<String> flushedFiles) throws IOException {
-    CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
-    for(String fileName : flushedFiles) {
-      cfsWriter.addFile(fileName);
-    }
-
-    // Perform the merge
-    cfsWriter.close();
-  }
-
-  void finishFlushedSegment(SegmentInfo newSegment) throws IOException {
-    assert newSegment != null;
-
-    IndexWriter.setDiagnostics(newSegment, "flush");
-
-    if (indexWriter.useCompoundFile(newSegment)) {
-      String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
-      message("creating compound file " + compoundFileName);
-      // Now build compound file
-      boolean success = false;
-      try {
-        createCompoundFile(compoundFileName, newSegment.files());
-        success = true;
-      } finally {
-        if (!success) {
-          if (infoStream != null) {
-            message("hit exception " +
-                "reating compound file for newly flushed segment " + newSegment.name);
-          }
-
-          indexWriter.deleter.refresh(newSegment.name);
-        }
-      }
-
-      indexWriter.deleter.deleteNewFiles(newSegment.files());
-      newSegment.setUseCompoundFile(true);
-
-    }
-
-    indexWriter.addNewSegment(newSegment);
-  }
-
 //  /* We have three pools of RAM: Postings, byte blocks
 //   * (holds freq/prox posting data) and per-doc buffers
 //   * (stored fields/term vectors).  Different docs require

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1059822&r1=1059821&r2=1059822&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Mon Jan 17 09:23:42 2011
@@ -148,12 +148,13 @@ public class DocumentsWriterPerThread {
 
   final AtomicLong bytesUsed = new AtomicLong(0);
 
-  FieldInfos fieldInfos = new FieldInfos();
+  private final FieldInfos fieldInfos;
 
   public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
     this.directory = directory;
     this.parent = parent;
     this.writer = parent.indexWriter;
+    this.fieldInfos = new FieldInfos();
     this.infoStream = parent.indexWriter.getInfoStream();
     this.docState = new DocState(this);
     this.docState.similarity = parent.indexWriter.getConfig().getSimilarity();
@@ -169,11 +170,15 @@ public class DocumentsWriterPerThread {
     aborting = true;
   }
 
-  public void addDocument(Document doc, Analyzer analyzer) throws IOException {
+  public void updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws IOException {
     assert writer.testPoint("DocumentsWriterPerThread addDocument start");
     docState.doc = doc;
     docState.analyzer = analyzer;
     docState.docID = numDocsInRAM;
+    if (delTerm != null) {
+      pendingDeletes.addTerm(delTerm, docState.docID);
+    }
+
     if (segment == null) {
       // this call is synchronized on IndexWriter.segmentInfos
       segment = writer.newSegmentName();
@@ -191,12 +196,15 @@ public class DocumentsWriterPerThread {
           // mark document as deleted
           deleteDocID(docState.docID);
           numDocsInRAM++;
+        } else {
+          abort();
         }
       }
     }
 
     success = false;
     try {
+      numDocsInRAM++;
       consumer.finishDocument();
 
       success = true;
@@ -228,24 +236,12 @@ public class DocumentsWriterPerThread {
     }
   }
 
-  void deleteQuery(Query query) {
-    pendingDeletes.addQuery(query, numDocsInRAM);
-  }
-
-  synchronized void deleteTerms(Term... terms) {
+  void deleteTerms(Term... terms) {
     for (Term term : terms) {
       pendingDeletes.addTerm(term, numDocsInRAM);
     }
   }
 
-  void deleteTerm(Term term) {
-    pendingDeletes.addTerm(term, numDocsInRAM);
-  }
-
-  public void commitDocument() {
-    numDocsInRAM++;
-  }
-
   int getNumDocsInRAM() {
     return numDocsInRAM;
   }
@@ -264,6 +260,7 @@ public class DocumentsWriterPerThread {
   /** Reset after a flush */
   private void doAfterFlush() throws IOException {
     segment = null;
+    parent.substractFlushedNumDocs(numDocsInRAM);
     numDocsInRAM = 0;
   }
 
@@ -279,45 +276,30 @@ public class DocumentsWriterPerThread {
       message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
     }
 
+    if (aborting) {
+      if (infoStream != null) {
+        message("flush: skip because aborting is set");
+      }
+      return null;
+    }
+
     boolean success = false;
 
     try {
-      consumer.flush(flushState);
 
-      boolean hasVectors = flushState.hasVectors;
+      SegmentInfo newSegment = new SegmentInfo(segment, flushState.numDocs, directory, false, fieldInfos.hasProx(), flushState.segmentCodecs, false);
+      consumer.flush(flushState);
+      newSegment.setHasVectors(flushState.hasVectors);
 
       if (infoStream != null) {
-        SegmentInfo si = new SegmentInfo(flushState.segmentName,
-            flushState.numDocs,
-            directory, false,
-            hasProx(),
-            getCodec(),
-            hasVectors);
-
-        final long newSegmentSize = si.sizeInBytes(true);
-        String message = "  ramUsed=" + nf.format(((double) bytesUsed.get())/1024./1024.) + " MB" +
-          " newFlushedSize=" + newSegmentSize +
-          " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
-          " new/old=" + nf.format(100.0*newSegmentSize/bytesUsed.get()) + "%";
-        message(message);
+        message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
+        message("flushedFiles=" + newSegment.files());
+        message("flushed codecs=" + newSegment.getSegmentCodecs());
       }
-
       flushedDocCount += flushState.numDocs;
 
       doAfterFlush();
 
-      // Create new SegmentInfo, but do not add to our
-      // segmentInfos until deletes are flushed
-      // successfully.
-      SegmentInfo newSegment = new SegmentInfo(flushState.segmentName,
-                                   flushState.numDocs,
-                                   directory, false,
-                                   hasProx(),
-                                   getCodec(),
-                                   hasVectors);
-
-
-      IndexWriter.setDiagnostics(newSegment, "flush");
       success = true;
 
       return newSegment;

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfos.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfos.java?rev=1059822&r1=1059821&r2=1059822&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfos.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfos.java Mon Jan 17 09:23:42 2011
@@ -42,9 +42,9 @@ public final class FieldInfos {
 
   // whenever you add a new format, make it 1 smaller (negative version logic)!
   static final int FORMAT_CURRENT = FORMAT_PER_FIELD_CODEC;
-  
+
   static final int FORMAT_MINIMUM = FORMAT_START;
-  
+
   static final byte IS_INDEXED = 0x1;
   static final byte STORE_TERMVECTOR = 0x2;
   static final byte STORE_POSITIONS_WITH_TERMVECTOR = 0x4;
@@ -52,7 +52,7 @@ public final class FieldInfos {
   static final byte OMIT_NORMS = 0x10;
   static final byte STORE_PAYLOADS = 0x20;
   static final byte OMIT_TERM_FREQ_AND_POSITIONS = 0x40;
-  
+
   private final ArrayList<FieldInfo> byNumber = new ArrayList<FieldInfo>();
   private final HashMap<String,FieldInfo> byName = new HashMap<String,FieldInfo>();
   private int format;
@@ -111,16 +111,16 @@ public final class FieldInfos {
     }
     return false;
   }
-  
+
   /**
    * Add fields that are indexed. Whether they have termvectors has to be specified.
-   * 
+   *
    * @param names The names of the fields
    * @param storeTermVectors Whether the fields store term vectors or not
    * @param storePositionWithTermVector true if positions should be stored.
    * @param storeOffsetWithTermVector true if offsets should be stored
    */
-  synchronized public void addIndexed(Collection<String> names, boolean storeTermVectors, boolean storePositionWithTermVector, 
+  synchronized public void addIndexed(Collection<String> names, boolean storeTermVectors, boolean storePositionWithTermVector,
                          boolean storeOffsetWithTermVector) {
     for (String name : names) {
       add(name, true, storeTermVectors, storePositionWithTermVector, storeOffsetWithTermVector);
@@ -129,10 +129,10 @@ public final class FieldInfos {
 
   /**
    * Assumes the fields are not storing term vectors.
-   * 
+   *
    * @param names The names of the fields
    * @param isIndexed Whether the fields are indexed or not
-   * 
+   *
    * @see #add(String, boolean)
    */
   synchronized public void add(Collection<String> names, boolean isIndexed) {
@@ -143,7 +143,7 @@ public final class FieldInfos {
 
   /**
    * Calls 5 parameter add with false for all TermVector parameters.
-   * 
+   *
    * @param name The name of the Fieldable
    * @param isIndexed true if the field is indexed
    * @see #add(String, boolean, boolean, boolean, boolean)
@@ -154,7 +154,7 @@ public final class FieldInfos {
 
   /**
    * Calls 5 parameter add with false for term vector positions and offsets.
-   * 
+   *
    * @param name The name of the field
    * @param isIndexed  true if the field is indexed
    * @param storeTermVector true if the term vector should be stored
@@ -162,12 +162,12 @@ public final class FieldInfos {
   synchronized public void add(String name, boolean isIndexed, boolean storeTermVector){
     add(name, isIndexed, storeTermVector, false, false, false);
   }
-  
+
   /** If the field is not yet known, adds it. If it is known, checks to make
    *  sure that the isIndexed flag is the same as was given previously for this
    *  field. If not - marks it as being indexed.  Same goes for the TermVector
    * parameters.
-   * 
+   *
    * @param name The name of the field
    * @param isIndexed true if the field is indexed
    * @param storeTermVector true if the term vector should be stored
@@ -197,7 +197,7 @@ public final class FieldInfos {
     add(name, isIndexed, storeTermVector, storePositionWithTermVector,
         storeOffsetWithTermVector, omitNorms, false, false);
   }
-  
+
   /** If the field is not yet known, adds it. If it is known, checks to make
    *  sure that the isIndexed flag is the same as was given previously for this
    *  field. If not - marks it as being indexed.  Same goes for the TermVector
@@ -231,8 +231,15 @@ public final class FieldInfos {
                fi.omitTermFreqAndPositions);
   }
 
+  synchronized public void update(FieldInfos otherInfos) {
+    int numFields = otherInfos.size();
+    for (int i = 0; i < numFields; i++) {
+      add(otherInfos.fieldInfo(i));
+    }
+  }
+
   private FieldInfo addInternal(String name, boolean isIndexed,
-                                boolean storeTermVector, boolean storePositionWithTermVector, 
+                                boolean storeTermVector, boolean storePositionWithTermVector,
                                 boolean storeOffsetWithTermVector, boolean omitNorms, boolean storePayloads, boolean omitTermFreqAndPositions) {
     name = StringHelper.intern(name);
     FieldInfo fi = new FieldInfo(name, isIndexed, byNumber.size(), storeTermVector, storePositionWithTermVector,
@@ -253,11 +260,11 @@ public final class FieldInfos {
 
   /**
    * Return the fieldName identified by its number.
-   * 
+   *
    * @param fieldNumber
    * @return the fieldName or an empty string when the field
    * with the given number doesn't exist.
-   */  
+   */
   public String fieldName(int fieldNumber) {
 	FieldInfo fi = fieldInfo(fieldNumber);
 	return (fi != null) ? fi.name : "";
@@ -268,7 +275,7 @@ public final class FieldInfos {
    * @param fieldNumber
    * @return the FieldInfo object or null when the given fieldNumber
    * doesn't exist.
-   */  
+   */
   public FieldInfo fieldInfo(int fieldNumber) {
 	return (fieldNumber >= 0) ? byNumber.get(fieldNumber) : null;
   }
@@ -353,7 +360,7 @@ public final class FieldInfos {
 
     if (input.getFilePointer() != input.length()) {
       throw new CorruptIndexException("did not read all bytes from file \"" + fileName + "\": read " + input.getFilePointer() + " vs size " + input.length());
-    }    
+    }
   }
 
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1059822&r1=1059821&r2=1059822&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java Mon Jan 17 09:23:42 2011
@@ -222,7 +222,7 @@ public class IndexWriter implements Clos
   final SegmentInfos segmentInfos;       // the segments
 
   private DocumentsWriter docWriter;
-  final IndexFileDeleter deleter;
+  private final IndexFileDeleter deleter;
 
   private Set<SegmentInfo> segmentsToOptimize = new HashSet<SegmentInfo>();           // used by optimize to note those needing optimization
   private int optimizeMaxNumSegments;
@@ -1875,10 +1875,10 @@ public class IndexWriter implements Clos
       mergePolicy.close();
       mergeScheduler.close();
 
-      bufferedDeletes.clear();
-
       synchronized(this) {
 
+        bufferedDeletes.clear();
+
         if (pendingCommit != null) {
           pendingCommit.rollbackCommit(directory);
           deleter.decRef(pendingCommit);
@@ -2047,9 +2047,51 @@ public class IndexWriter implements Clos
     deleter.checkpoint(segmentInfos, false);
   }
 
-  synchronized void addNewSegment(SegmentInfo newSegment) throws IOException {
-    segmentInfos.add(newSegment);
-    checkpoint();
+  void addFlushedSegment(SegmentInfo newSegment) throws IOException {
+    assert newSegment != null;
+
+    setDiagnostics(newSegment, "flush");
+
+    if (useCompoundFile(newSegment)) {
+      String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
+      message("creating compound file " + compoundFileName);
+      // Now build compound file
+      boolean success = false;
+      try {
+        CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
+        for(String fileName : newSegment.files()) {
+          cfsWriter.addFile(fileName);
+        }
+
+        // Perform the merge
+        cfsWriter.close();
+        synchronized(this) {
+          deleter.deleteNewFiles(newSegment.files());
+        }
+
+        newSegment.setUseCompoundFile(true);
+
+        success = true;
+      } finally {
+        if (!success) {
+          if (infoStream != null) {
+            message("hit exception " +
+                "reating compound file for newly flushed segment " + newSegment.name);
+          }
+
+          synchronized(this) {
+            deleter.refresh(newSegment.name);
+          }
+        }
+      }
+
+
+    }
+
+    synchronized(this) {
+      segmentInfos.add(newSegment);
+      checkpoint();
+    }
   }
 
   synchronized boolean useCompoundFile(SegmentInfo segmentInfo) throws IOException {
@@ -2207,24 +2249,33 @@ public class IndexWriter implements Clos
     }
   }
 
-  /** Merges the provided indexes into this index.
-   * <p>After this completes, the index is optimized. </p>
-   * <p>The provided IndexReaders are not closed.</p>
-   *
-   * <p><b>NOTE:</b> while this is running, any attempts to
-   * add or delete documents (with another thread) will be
-   * paused until this method completes.
-   *
-   * <p>See {@link #addIndexes} for details on transactional
-   * semantics, temporary free space required in the Directory,
-   * and non-CFS segments on an Exception.</p>
+  /**
+   * Merges the provided indexes into this index.
+   * <p>
+   * After this completes, the index is optimized.
+   * </p>
+   * <p>
+   * The provided IndexReaders are not closed.
+   * </p>
    *
-   * <p><b>NOTE</b>: if this method hits an OutOfMemoryError
-   * you should immediately close the writer.  See <a
-   * href="#OOME">above</a> for details.</p>
+   * <p>
+   * <b>NOTE:</b> while this is running, any attempts to add or delete documents
+   * (with another thread) will be paused until this method completes.
    *
-   * @throws CorruptIndexException if the index is corrupt
-   * @throws IOException if there is a low-level IO error
+   * <p>
+   * See {@link #addIndexes} for details on transactional semantics, temporary
+   * free space required in the Directory, and non-CFS segments on an Exception.
+   * </p>
+   *
+   * <p>
+   * <b>NOTE</b>: if this method hits an OutOfMemoryError you should immediately
+   * close the writer. See <a href="#OOME">above</a> for details.
+   * </p>
+   *
+   * @throws CorruptIndexException
+   *           if the index is corrupt
+   * @throws IOException
+   *           if there is a low-level IO error
    */
   public void addIndexes(IndexReader... readers) throws CorruptIndexException, IOException {
     ensureOpen();
@@ -3239,12 +3290,12 @@ public class IndexWriter implements Clos
 
   // For test purposes.
   final int getBufferedDeleteTermsSize() {
-    return docWriter.getPendingDeletes().terms.size();
+    return docWriter.getBufferedDeleteTermsSize();
   }
 
   // For test purposes.
   final int getNumBufferedDeleteTerms() {
-    return docWriter.getPendingDeletes().numTermDeletes.get();
+    return docWriter.getNumBufferedDeleteTerms();
   }
 
   // utility routines for tests

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/SegmentDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/SegmentDeletes.java?rev=1059822&r1=1059821&r2=1059822&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/SegmentDeletes.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/SegmentDeletes.java Mon Jan 17 09:23:42 2011
@@ -102,7 +102,7 @@ class SegmentDeletes {
       return s;
     }
   }
-  
+
   void update(SegmentDeletes in, boolean noLimit) {
     numTermDeletes.addAndGet(in.numTermDeletes.get());
     for (Map.Entry<Term,Integer> ent : in.terms.entrySet()) {
@@ -168,7 +168,7 @@ class SegmentDeletes {
       bytesUsed.addAndGet(BYTES_PER_DEL_TERM + term.bytes.length);
     }
   }
-    
+
   void clear() {
     terms.clear();
     queries.clear();
@@ -176,12 +176,12 @@ class SegmentDeletes {
     numTermDeletes.set(0);
     bytesUsed.set(0);
   }
-  
+
   void clearDocIDs() {
     bytesUsed.addAndGet(-docIDs.size()*BYTES_PER_DEL_DOCID);
     docIDs.clear();
   }
-  
+
   boolean any() {
     return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0;
   }