You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by bu...@apache.org on 2011/02/25 08:15:29 UTC

svn commit: r1074414 - in /lucene/dev/branches/realtime_search/lucene/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/

Author: buschmi
Date: Fri Feb 25 07:15:28 2011
New Revision: 1074414

URL: http://svn.apache.org/viewvc?rev=1074414&view=rev
Log:
LUCENE-2324: new doc deletes approach; various bug fixes

Modified:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1074414&r1=1074413&r2=1074414&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Fri Feb 25 07:15:28 2011
@@ -27,13 +27,13 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
 import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.SimilarityProvider;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.BitVector;
 
 /**
  * This class accepts multiple added documents and directly
@@ -145,28 +145,24 @@ final class DocumentsWriter {
   }
 
   boolean deleteQueries(final Query... queries) throws IOException {
+    synchronized(this) {
+      for (Query query : queries) {
+        pendingDeletes.addQuery(query, BufferedDeletes.MAX_INT);
+      }
+    }
+
     Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
 
-    boolean deleted = false;
     while (threadsIterator.hasNext()) {
       ThreadState state = threadsIterator.next();
       state.lock();
       try {
         state.perThread.deleteQueries(queries);
-        deleted = true;
       } finally {
         state.unlock();
       }
     }
 
-    if (!deleted) {
-      synchronized(this) {
-        for (Query query : queries) {
-          pendingDeletes.addQuery(query, BufferedDeletes.MAX_INT);
-        }
-      }
-    }
-
     return false;
   }
 
@@ -175,12 +171,16 @@ final class DocumentsWriter {
   }
 
   boolean deleteTerms(final Term... terms) throws IOException {
+    synchronized(this) {
+      for (Term term : terms) {
+        pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
+      }
+    }
+
     Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
 
-    boolean deleted = false;
     while (threadsIterator.hasNext()) {
       ThreadState state = threadsIterator.next();
-      deleted = true;
       state.lock();
       try {
         state.perThread.deleteTerms(terms);
@@ -189,14 +189,6 @@ final class DocumentsWriter {
       }
     }
 
-    if (!deleted) {
-      synchronized(this) {
-        for (Term term : terms) {
-          pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
-        }
-      }
-    }
-
     return false;
   }
 
@@ -207,12 +199,14 @@ final class DocumentsWriter {
     return deleteTerms(term);
   }
 
-  boolean deleteTerm(final Term term, ThreadState exclude) {
+  void deleteTerm(final Term term, ThreadState exclude) {
+    synchronized(this) {
+      pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
+    }
+
     Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
 
-    boolean deleted = false;
     while (threadsIterator.hasNext()) {
-      deleted = true;
       ThreadState state = threadsIterator.next();
       if (state != exclude) {
         state.lock();
@@ -223,8 +217,6 @@ final class DocumentsWriter {
         }
       }
     }
-
-    return deleted;
   }
 
   /** If non-null, various details of indexing are printed
@@ -303,6 +295,10 @@ final class DocumentsWriter {
   synchronized void abort() throws IOException {
     boolean success = false;
 
+    synchronized (this) {
+      pendingDeletes.clear();
+    }
+
     try {
       if (infoStream != null) {
         message("docWriter: abort");
@@ -328,7 +324,7 @@ final class DocumentsWriter {
     }
   }
 
-  synchronized boolean anyChanges() {
+  boolean anyChanges() {
     return numDocsInRAM.get() != 0 || anyDeletions();
   }
 
@@ -355,29 +351,10 @@ final class DocumentsWriter {
     return numDeletes;
   }
 
-  // TODO: can we improve performance of this method by keeping track
-  // here in DW of whether any DWPT has deletions?
-  public synchronized boolean anyDeletions() {
-    if (pendingDeletes.any()) {
-      return true;
+  public boolean anyDeletions() {
+    return pendingDeletes.any();
     }
 
-    Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
-    while (threadsIterator.hasNext()) {
-      ThreadState state = threadsIterator.next();
-      state.lock();
-      try {
-        if (state.perThread.pendingDeletes.any()) {
-          return true;
-        }
-      } finally {
-        state.unlock();
-      }
-    }
-
-    return false;
-  }
-
   void close() {
     closed = true;
   }
@@ -386,9 +363,7 @@ final class DocumentsWriter {
       throws CorruptIndexException, IOException {
     ensureOpen();
 
-    SegmentInfo newSegment = null;
-    BufferedDeletes segmentDeletes = null;
-    BitVector deletedDocs = null;
+    FlushedSegment newSegment = null;
 
     ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc);
     try {
@@ -398,39 +373,38 @@ final class DocumentsWriter {
       numDocsInRAM.incrementAndGet();
 
       newSegment = finishAddDocument(dwpt, perThreadRAMUsedBeforeAdd);
-      if (newSegment != null) {
-        deletedDocs = dwpt.flushState.deletedDocs;
-        if (dwpt.pendingDeletes.any()) {
-          segmentDeletes = dwpt.pendingDeletes;
-          dwpt.pendingDeletes = new BufferedDeletes(false);
-        }
-      }
     } finally {
       perThread.unlock();
     }
 
-    if (segmentDeletes != null) {
-      pushDeletes(newSegment, segmentDeletes);
+    // delete term from other DWPTs later, so that this thread
+    // doesn't have to lock multiple DWPTs at the same time
+    if (delTerm != null) {
+      deleteTerm(delTerm, perThread);
+    }
+
+    if (newSegment != null) {
+      finishFlushedSegment(newSegment);
     }
 
     if (newSegment != null) {
       perThreadPool.clearThreadBindings(perThread);
-      indexWriter.addFlushedSegment(newSegment, deletedDocs);
       return true;
     }
 
-    // delete term from other DWPTs later, so that this thread
-    // doesn't have to lock multiple DWPTs at the same time
-    if (delTerm != null) {
-      deleteTerm(delTerm, perThread);
+    return false;
     }
 
-    return false;
+  private void finishFlushedSegment(FlushedSegment newSegment) throws IOException {
+    pushDeletes(newSegment);
+    if (newSegment != null) {
+      indexWriter.addFlushedSegment(newSegment);
+  }
   }
 
-  private final SegmentInfo finishAddDocument(DocumentsWriterPerThread perThread,
+  private final FlushedSegment finishAddDocument(DocumentsWriterPerThread perThread,
       long perThreadRAMUsedBeforeAdd) throws IOException {
-    SegmentInfo newSegment = null;
+    FlushedSegment newSegment = null;
 
     if (perThread.getNumDocsInRAM() == maxBufferedDocs) {
       newSegment = perThread.flush();
@@ -445,20 +419,21 @@ final class DocumentsWriter {
     return newSegment;
   }
 
-  final void substractFlushedNumDocs(int numFlushed) {
+  final void subtractFlushedNumDocs(int numFlushed) {
     int oldValue = numDocsInRAM.get();
     while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
       oldValue = numDocsInRAM.get();
     }
   }
 
-  private final void pushDeletes(SegmentInfo segmentInfo, BufferedDeletes segmentDeletes) {
-    synchronized(indexWriter) {
-      // Lock order: DW -> BD
+  private synchronized void pushDeletes(FlushedSegment flushedSegment) {
+    maybePushPendingDeletes();
+    if (flushedSegment != null) {
+      BufferedDeletes deletes = flushedSegment.segmentDeletes;
       final long delGen = bufferedDeletesStream.getNextGen();
-      if (segmentDeletes.any()) {
-        if (indexWriter.segmentInfos.size() > 0 || segmentInfo != null) {
-          final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(segmentDeletes, delGen);
+      // Lock order: DW -> BD
+      if (deletes != null && deletes.any()) {
+        final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(deletes, delGen);
           if (infoStream != null) {
             message("flush: push buffered deletes");
           }
@@ -466,40 +441,27 @@ final class DocumentsWriter {
           if (infoStream != null) {
             message("flush: delGen=" + packet.gen);
           }
-          if (segmentInfo != null) {
-            segmentInfo.setBufferedDeletesGen(packet.gen);
           }
-        } else {
-          if (infoStream != null) {
-            message("flush: drop buffered deletes: no segments");
+      flushedSegment.segmentInfo.setBufferedDeletesGen(delGen);
           }
-          // We can safely discard these deletes: since
-          // there are no segments, the deletions cannot
-          // affect anything.
         }
-      } else if (segmentInfo != null) {
-        segmentInfo.setBufferedDeletesGen(delGen);
+
+  private synchronized final void maybePushPendingDeletes() {
+    final long delGen = bufferedDeletesStream.getNextGen();
+    if (pendingDeletes.any()) {
+      bufferedDeletesStream.push(new FrozenBufferedDeletes(pendingDeletes, delGen));
+      pendingDeletes.clear();
       }
     }
-  }
 
   final boolean flushAllThreads(final boolean flushDeletes)
     throws IOException {
 
-    if (flushDeletes) {
-      synchronized (this) {
-        pushDeletes(null, pendingDeletes);
-        pendingDeletes = new BufferedDeletes(false);
-      }
-    }
-
     Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
     boolean anythingFlushed = false;
 
     while (threadsIterator.hasNext()) {
-      SegmentInfo newSegment = null;
-      BufferedDeletes segmentDeletes = null;
-      BitVector deletedDocs = null;
+      FlushedSegment newSegment = null;
 
       ThreadState perThread = threadsIterator.next();
       perThread.lock();
@@ -520,34 +482,24 @@ final class DocumentsWriter {
           newSegment = dwpt.flush();
 
           if (newSegment != null) {
-            anythingFlushed = true;
-            deletedDocs = dwpt.flushState.deletedDocs;
             perThreadPool.clearThreadBindings(perThread);
-            if (dwpt.pendingDeletes.any()) {
-              segmentDeletes = dwpt.pendingDeletes;
-              dwpt.pendingDeletes = new BufferedDeletes(false);
             }
           }
-        } else if (flushDeletes && dwpt.pendingDeletes.any()) {
-          segmentDeletes = dwpt.pendingDeletes;
-          dwpt.pendingDeletes = new BufferedDeletes(false);
-        }
       } finally {
         perThread.unlock();
       }
 
-      if (segmentDeletes != null) {
-          pushDeletes(newSegment, segmentDeletes);
-      }
-
-
       if (newSegment != null) {
-        // important do unlock the perThread before finishFlushedSegment
-        // is called to prevent deadlock on IndexWriter mutex
-        indexWriter.addFlushedSegment(newSegment, deletedDocs);
+        anythingFlushed = true;
+        finishFlushedSegment(newSegment);
       }
     }
 
+    if (!anythingFlushed && flushDeletes) {
+      maybePushPendingDeletes();
+    }
+
+
     return anythingFlushed;
   }
 

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1074414&r1=1074413&r2=1074414&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Fri Feb 25 07:15:28 2011
@@ -101,6 +101,26 @@ public class DocumentsWriterPerThread {
     public boolean testPoint(String name) {
       return docWriter.writer.testPoint(name);
     }
+
+    public void clear() {
+      // don't hold onto doc nor analyzer, in case it is
+      // largish:
+      doc = null;
+      analyzer = null;
+  }
+  }
+
+  static class FlushedSegment {
+    final SegmentInfo segmentInfo;
+    final BufferedDeletes segmentDeletes;
+    final BitVector deletedDocuments;
+
+    private FlushedSegment(SegmentInfo segmentInfo,
+        BufferedDeletes segmentDeletes, BitVector deletedDocuments) {
+      this.segmentInfo = segmentInfo;
+      this.segmentDeletes = segmentDeletes;
+      this.deletedDocuments = deletedDocuments;
+    }
   }
 
   /** Called if we hit an exception at a bad time (when
@@ -136,7 +156,6 @@ public class DocumentsWriterPerThread {
   final Directory directory;
   final DocState docState;
   final DocConsumer consumer;
-  private DocFieldProcessor docFieldProcessor;
 
   String segment;                         // Current segment we are working on
   boolean aborting;               // True if an abort is pending
@@ -160,10 +179,7 @@ public class DocumentsWriterPerThread {
     this.docState.similarityProvider = parent.indexWriter.getConfig().getSimilarityProvider();
 
     consumer = indexingChain.getChain(this);
-    if (consumer instanceof DocFieldProcessor) {
-      docFieldProcessor = (DocFieldProcessor) consumer;
     }
-  }
 
   void setAborting() {
     aborting = true;
@@ -175,7 +191,7 @@ public class DocumentsWriterPerThread {
     docState.analyzer = analyzer;
     docState.docID = numDocsInRAM;
     if (delTerm != null) {
-      pendingDeletes.addTerm(delTerm, docState.docID);
+      pendingDeletes.addTerm(delTerm, numDocsInRAM);
     }
 
     if (segment == null) {
@@ -186,7 +202,11 @@ public class DocumentsWriterPerThread {
 
     boolean success = false;
     try {
+      try {
       consumer.processDocument(fieldInfos);
+      } finally {
+        docState.clear();
+      }
 
       success = true;
     } finally {
@@ -230,16 +250,20 @@ public class DocumentsWriterPerThread {
   }
 
   void deleteQueries(Query... queries) {
+    if (numDocsInRAM > 0) {
     for (Query query : queries) {
       pendingDeletes.addQuery(query, numDocsInRAM);
     }
   }
+  }
 
   void deleteTerms(Term... terms) {
+    if (numDocsInRAM > 0) {
     for (Term term : terms) {
       pendingDeletes.addTerm(term, numDocsInRAM);
     }
   }
+  }
 
   int getNumDocsInRAM() {
     return numDocsInRAM;
@@ -254,12 +278,12 @@ public class DocumentsWriterPerThread {
     segment = null;
     consumer.doAfterFlush();
     fieldInfos = fieldInfos.newFieldInfosWithGlobalFieldNumberMap();
-    parent.substractFlushedNumDocs(numDocsInRAM);
+    parent.subtractFlushedNumDocs(numDocsInRAM);
     numDocsInRAM = 0;
   }
 
   /** Flush all pending docs to a new segment */
-  SegmentInfo flush() throws IOException {
+  FlushedSegment flush() throws IOException {
     assert numDocsInRAM > 0;
 
     flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
@@ -295,6 +319,7 @@ public class DocumentsWriterPerThread {
 
       SegmentInfo newSegment = new SegmentInfo(segment, flushState.numDocs, directory, false, flushState.segmentCodecs, fieldInfos);
       consumer.flush(flushState);
+      pendingDeletes.terms.clear();
       newSegment.clearFilesCache();
 
       if (infoStream != null) {
@@ -305,11 +330,19 @@ public class DocumentsWriterPerThread {
       }
       flushedDocCount += flushState.numDocs;
 
+      BufferedDeletes segmentDeletes = null;
+      if (pendingDeletes.queries.isEmpty()) {
+        pendingDeletes.clear();
+      } else {
+        segmentDeletes = pendingDeletes;
+        pendingDeletes = new BufferedDeletes(false);
+      }
+
       doAfterFlush();
 
       success = true;
 
-      return newSegment;
+      return new FlushedSegment(newSegment, segmentDeletes, flushState.deletedDocs);
     } finally {
       if (!success) {
         if (segment != null) {

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1074414&r1=1074413&r2=1074414&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java Fri Feb 25 07:15:28 2011
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
 import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
 import org.apache.lucene.index.codecs.CodecProvider;
@@ -2080,8 +2081,10 @@ public class IndexWriter implements Clos
     deleter.checkpoint(segmentInfos, false);
   }
 
-  void addFlushedSegment(SegmentInfo newSegment, BitVector deletedDocs) throws IOException {
-    assert newSegment != null;
+  void addFlushedSegment(FlushedSegment flushedSegment) throws IOException {
+    assert flushedSegment != null;
+
+    SegmentInfo newSegment = flushedSegment.segmentInfo;
 
     setDiagnostics(newSegment, "flush");
 
@@ -2107,8 +2110,8 @@ public class IndexWriter implements Clos
 
       // Must write deleted docs after the CFS so we don't
       // slurp the del file into CFS:
-      if (deletedDocs != null) {
-        final int delCount = deletedDocs.count();
+      if (flushedSegment.deletedDocuments != null) {
+        final int delCount = flushedSegment.deletedDocuments.count();
         assert delCount > 0;
         newSegment.setDelCount(delCount);
         newSegment.advanceDelGen();
@@ -2123,7 +2126,7 @@ public class IndexWriter implements Clos
           // shortly-to-be-opened SegmentReader and let it
           // carry the changes; there's no reason to use
           // filesystem as intermediary here.
-          deletedDocs.write(directory, delFileName);
+          flushedSegment.deletedDocuments.write(directory, delFileName);
           success2 = true;
         } finally {
           if (!success2) {

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java?rev=1074414&r1=1074413&r2=1074414&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java Fri Feb 25 07:15:28 2011
@@ -71,7 +71,6 @@ final class TermVectorsTermsWriterPerFie
 
     if (doVectors) {
       termsWriter.hasVectors = true;
-      if (termsWriter.tvx != null) {
         if (termsHashPerField.bytesHash.size() != 0) {
           // Only necessary if previous doc hit a
           // non-aborting exception while writing vectors in
@@ -79,7 +78,6 @@ final class TermVectorsTermsWriterPerFie
           termsHashPerField.reset();
         }
       }
-    }
 
     // TODO: only if needed for performance
     //perThread.postingsCount = 0;

Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java?rev=1074414&r1=1074413&r2=1074414&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java Fri Feb 25 07:15:28 2011
@@ -19,6 +19,7 @@ package org.apache.lucene.index;
 
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.*;
+import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.store.*;
 import org.apache.lucene.util.*;
 import org.junit.Test;
@@ -48,7 +49,21 @@ public class TestRollingUpdates extends 
         id++;
       }
       doc.getField("id").setValue(myID);
-      w.updateDocument(new Term("id", myID), doc);
+      int mode = docIter % 3;
+      switch (mode) {
+        case 0: {
+          w.deleteDocuments(new Term("id", myID));
+          w.addDocument(doc);
+          break;
+        }
+        case 1: {
+          w.deleteDocuments(new TermQuery(new Term("id", myID)));
+          w.addDocument(doc);
+          break;
+        }
+        default : w.updateDocument(new Term("id", myID), doc);
+      }
+
 
       if (docIter >= SIZE && random.nextInt(50) == 17) {
         if (r != null) {