You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by bu...@apache.org on 2011/01/13 09:49:22 UTC

svn commit: r1058461 - /lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/

Author: buschmi
Date: Thu Jan 13 08:49:21 2011
New Revision: 1058461

URL: http://svn.apache.org/viewvc?rev=1058461&view=rev
Log:
LUCENE-2324: More progress with concurrency and per-thread pool

Added:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
Removed:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
Modified:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1058461&r1=1058460&r2=1058461&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Thu Jan 13 08:49:21 2011
@@ -20,6 +20,7 @@ package org.apache.lucene.index;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -28,6 +29,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Similarity;
 import org.apache.lucene.store.AlreadyClosedException;
@@ -107,7 +109,7 @@ final class DocumentsWriter {
   int numDocsInStore;                     // # docs written to doc stores
 
   boolean bufferIsFull;                   // True when it's time to write segment
-  private boolean closed;
+  private volatile boolean closed;
 
   PrintStream infoStream;
   int maxFieldLength = IndexWriterConfig.UNLIMITED_FIELD_LENGTH;
@@ -115,12 +117,10 @@ final class DocumentsWriter {
 
   List<String> newFiles;
 
-  private final DocumentsWriterThreadPool threadPool;
   final IndexWriter indexWriter;
 
   private AtomicInteger numDocsInRAM = new AtomicInteger(0);
   private AtomicLong ramUsed = new AtomicLong(0);
-  private int numDocumentsWriterPerThreads;
 
   static class DocState {
     DocumentsWriter docWriter;
@@ -160,54 +160,71 @@ final class DocumentsWriter {
   private final FieldInfos fieldInfos;
 
   final BufferedDeletes bufferedDeletes;
+  final SegmentDeletes pendingDeletes;
   private final IndexWriter.FlushControl flushControl;
-  private final IndexingChain chain;
+  final IndexingChain chain;
 
-  DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain chain, DocumentsWriterThreadPool indexerThreadPool, FieldInfos fieldInfos, BufferedDeletes bufferedDeletes) throws IOException {
+  final DocumentsWriterPerThreadPool perThreadPool;
+
+  DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain chain, DocumentsWriterPerThreadPool indexerThreadPool, FieldInfos fieldInfos, BufferedDeletes bufferedDeletes) throws IOException {
     this.directory = directory;
     this.indexWriter = writer;
     this.similarity = writer.getConfig().getSimilarity();
     this.fieldInfos = fieldInfos;
     this.bufferedDeletes = bufferedDeletes;
-    this.threadPool = indexerThreadPool;
+    this.perThreadPool = indexerThreadPool;
+    this.pendingDeletes = new SegmentDeletes();
     this.chain = chain;
     flushControl = writer.flushControl;
+    this.perThreadPool.initialize(this);
   }
 
-  boolean deleteQueries(Query... queries) {
-    final boolean doFlush = flushControl.waitUpdate(0, queries.length);
-    Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
-    while (it.hasNext()) {
-      it.next().deleteQueries(queries);
+  boolean deleteQueries(final Query... queries) throws IOException {
+    Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+
+    boolean added = false;
+    while (threadsIterator.hasNext()) {
+      threadsIterator.next().perThread.deleteQueries(queries);
+      added = true;
     }
-    return doFlush;
-  }
 
-  boolean deleteQuery(Query query) {
-    final boolean doFlush = flushControl.waitUpdate(0, 1);
-    Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
-    while (it.hasNext()) {
-      it.next().deleteQuery(query);
+    if (!added) {
+      synchronized(this) {
+        for (Query query : queries) {
+          pendingDeletes.addQuery(query, SegmentDeletes.MAX_INT);
+        }
+      }
     }
-    return doFlush;
+
+    return true;
   }
 
-  boolean deleteTerms(Term... terms) {
-    final boolean doFlush = flushControl.waitUpdate(0, terms.length);
-    Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
-    while (it.hasNext()) {
-      it.next().deleteTerms(terms);
-    }
-    return doFlush;
+  boolean deleteQuery(final Query query) throws IOException {
+    return deleteQueries(query);
   }
 
-  boolean deleteTerm(Term term, boolean skipWait) {
-    final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
-    Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
-    while (it.hasNext()) {
-      it.next().deleteTerm(term);
+  boolean deleteTerms(final Term... terms) throws IOException {
+    Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+
+    boolean added = false;
+    while (threadsIterator.hasNext()) {
+      threadsIterator.next().perThread.deleteTerms(terms);
+      added = true;
     }
-    return doFlush;
+
+    if (!added) {
+      synchronized(this) {
+        for (Term term : terms) {
+          pendingDeletes.addTerm(term, SegmentDeletes.MAX_INT);
+        }
+      }
+    }
+
+    return false;
+  }
+
+  boolean deleteTerm(final Term term) throws IOException {
+    return deleteTerms(term);
   }
 
   public FieldInfos getFieldInfos() {
@@ -224,25 +241,26 @@ final class DocumentsWriter {
    *  here. */
   synchronized void setInfoStream(PrintStream infoStream) {
     this.infoStream = infoStream;
-    Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
-    while (it.hasNext()) {
-      it.next().docState.infoStream = infoStream;
-    }
+    pushConfigChange();
   }
 
   synchronized void setMaxFieldLength(int maxFieldLength) {
     this.maxFieldLength = maxFieldLength;
-    Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
-    while (it.hasNext()) {
-      it.next().docState.maxFieldLength = maxFieldLength;
-    }
+    pushConfigChange();
   }
 
   synchronized void setSimilarity(Similarity similarity) {
     this.similarity = similarity;
-    Iterator<DocumentsWriterPerThread> it = threadPool.getPerThreadIterator();
+    pushConfigChange();
+  }
+
+  private final void pushConfigChange() {
+    Iterator<ThreadState> it = perThreadPool.getAllPerThreadsIterator();
     while (it.hasNext()) {
-      it.next().docState.similarity = similarity;
+      DocumentsWriterPerThread perThread = it.next().perThread;
+      perThread.docState.infoStream = this.infoStream;
+      perThread.docState.maxFieldLength = this.maxFieldLength;
+      perThread.docState.similarity = this.similarity;
     }
   }
 
@@ -300,14 +318,25 @@ final class DocumentsWriter {
    *  currently buffered docs.  This resets our state,
    *  discarding any docs added since last flush. */
   synchronized void abort() throws IOException {
-    if (infoStream != null) {
-      message("docWriter: abort");
-    }
-
     boolean success = false;
+
     try {
+      if (infoStream != null) {
+        message("docWriter: abort");
+      }
+
+      Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+
+      while (threadsIterator.hasNext()) {
+        ThreadState perThread = threadsIterator.next();
+        perThread.lock();
+        try {
+          perThread.perThread.abort();
+        } finally {
+          perThread.unlock();
+        }
+      }
 
-      threadPool.abort();
       success = true;
     } finally {
       notifyAll();
@@ -324,16 +353,12 @@ final class DocumentsWriter {
   }
 
   // for testing
-  public SegmentDeletes getPendingDeletes() {
-    return null;
-    // nocommit
-    //return pendingDeletes;
+  public synchronized SegmentDeletes getPendingDeletes() {
+    return pendingDeletes;
   }
 
-  public boolean anyDeletions() {
-    // nocommit
-    return true;
-    //return pendingDeletes.any();
+  public synchronized boolean anyDeletions() {
+    return pendingDeletes.any();
   }
 
   synchronized void close() {
@@ -341,52 +366,54 @@ final class DocumentsWriter {
     notifyAll();
   }
 
-  synchronized DocumentsWriterPerThread newDocumentsWriterPerThread() {
-    DocumentsWriterPerThread perThread = new DocumentsWriterPerThread(directory, this, chain);
-    numDocumentsWriterPerThreads++;
-    return perThread;
-  }
-
   boolean updateDocument(final Document doc, final Analyzer analyzer, final Term delTerm)
       throws CorruptIndexException, IOException {
+    ensureOpen();
 
-    boolean flushed = threadPool.executePerThread(this, doc,
-        new DocumentsWriterThreadPool.PerThreadTask<Boolean>() {
-          @Override
-          public Boolean process(final DocumentsWriterPerThread perThread) throws IOException {
-            long perThreadRAMUsedBeforeAdd = perThread.bytesUsed();
-            perThread.addDocument(doc, analyzer);
-
-            synchronized(DocumentsWriter.this) {
-              ensureOpen();
-              if (delTerm != null) {
-                deleteTerm(delTerm, true);
-              }
-              perThread.commitDocument();
-              numDocsInRAM.incrementAndGet();
-            }
-
-            if (finishAddDocument(perThread, perThreadRAMUsedBeforeAdd)) {
-              super.clearThreadBindings();
-              return true;
-            }
-            return false;
-          }
-        });
+    Collection<String> flushedFiles = null;
+    SegmentInfo newSegment = null;
+
+    ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc);
+    try {
+      DocumentsWriterPerThread dwpt = perThread.perThread;
+      long perThreadRAMUsedBeforeAdd = dwpt.bytesUsed();
+      dwpt.addDocument(doc, analyzer);
+
+      synchronized(DocumentsWriter.this) {
+        if (delTerm != null) {
+          deleteTerm(delTerm);
+        }
+        dwpt.commitDocument();
+        numDocsInRAM.incrementAndGet();
+      }
+
+      newSegment = finishAddDocument(dwpt, perThreadRAMUsedBeforeAdd);
+      if (newSegment != null) {
+        perThreadPool.clearThreadBindings(perThread);
+        flushedFiles = new HashSet<String>();
+        flushedFiles.addAll(dwpt.flushState.flushedFiles);
+      }
 
-    if (flushed) {
-      indexWriter.maybeMerge();
+    } finally {
+      perThread.unlock();
+    }
+
+    if (newSegment != null) {
+      finishFlushedSegment(newSegment, flushedFiles);
       return true;
     }
 
     return false;
   }
 
-  private final boolean finishAddDocument(DocumentsWriterPerThread perThread,
+  private final SegmentInfo finishAddDocument(DocumentsWriterPerThread perThread,
       long perThreadRAMUsedBeforeAdd) throws IOException {
+    SegmentInfo newSegment = null;
+
     int numDocsPerThread = perThread.getNumDocsInRAM();
-    boolean flushed = maybeFlushPerThread(perThread);
-    if (flushed) {
+    if (perThread.getNumDocsInRAM() == maxBufferedDocs) {
+      newSegment = perThread.flush();
+
       int oldValue = numDocsInRAM.get();
       while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numDocsPerThread)) {
         oldValue = numDocsInRAM.get();
@@ -399,81 +426,73 @@ final class DocumentsWriter {
       oldValue = ramUsed.get();
     }
 
-    return flushed;
+    return newSegment;
   }
 
-  private boolean flushSegment(DocumentsWriterPerThread perThread) throws IOException {
-    if (perThread.getNumDocsInRAM() == 0) {
-      return false;
-    }
+  final boolean flushAllThreads(final boolean flushDeletes)
+    throws IOException {
 
-    SegmentInfo newSegment = perThread.flush();
-    newSegment.dir = indexWriter.getDirectory();
+    if (flushDeletes) {
+      if (indexWriter.segmentInfos.size() > 0 && pendingDeletes.any()) {
+        bufferedDeletes.pushDeletes(pendingDeletes, indexWriter.segmentInfos.lastElement(), true);
+        pendingDeletes.clear();
+      }
+    }
 
-    finishFlushedSegment(newSegment, perThread);
-    return true;
-  }
+    Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+    boolean anythingFlushed = false;
 
-  private final boolean maybeFlushPerThread(DocumentsWriterPerThread perThread) throws IOException {
-    if (perThread.getNumDocsInRAM() == maxBufferedDocs) {
-      flushSegment(perThread);
-      assert perThread.getNumDocsInRAM() == 0;
+    while (threadsIterator.hasNext()) {
+      Collection<String> flushedFiles = null;
+      SegmentInfo newSegment = null;
 
-      return true;
-    }
+      ThreadState perThread = threadsIterator.next();
+      perThread.lock();
+      try {
+        DocumentsWriterPerThread dwpt = perThread.perThread;
+        final int numDocs = dwpt.getNumDocsInRAM();
 
-    return false;
-  }
+        // Always flush docs if there are any
+        boolean flushDocs = numDocs > 0;
 
-  final boolean flushAllThreads(final boolean flushDeletes)
-    throws IOException {
+        String segment = dwpt.getSegment();
 
-    return threadPool.executeAllThreads(this, new DocumentsWriterThreadPool.AllThreadsTask<Boolean>() {
-      @Override
-      public Boolean process(Iterator<DocumentsWriterPerThread> threadsIterator) throws IOException {
-        boolean anythingFlushed = false;
-
-        while (threadsIterator.hasNext()) {
-          DocumentsWriterPerThread perThread = threadsIterator.next();
-          final int numDocs = perThread.getNumDocsInRAM();
-
-          // Always flush docs if there are any
-          boolean flushDocs = numDocs > 0;
-
-          String segment = perThread.getSegment();
-
-          // If we are flushing docs, segment must not be null:
-          assert segment != null || !flushDocs;
-
-          if (flushDocs) {
-            SegmentInfo newSegment = perThread.flush();
-            newSegment.dir = indexWriter.getDirectory();
-
-            if (newSegment != null) {
-              anythingFlushed = true;
-
-              IndexWriter.setDiagnostics(newSegment, "flush");
-              finishFlushedSegment(newSegment, perThread);
-            }
-          } else if (flushDeletes) {
-            perThread.pushDeletes(null, indexWriter.segmentInfos);
+        // If we are flushing docs, segment must not be null:
+        assert segment != null || !flushDocs;
+
+        if (flushDocs) {
+          newSegment = dwpt.flush();
+
+          if (newSegment != null) {
+            IndexWriter.setDiagnostics(newSegment, "flush");
+            flushedFiles = new HashSet<String>();
+            flushedFiles.addAll(dwpt.flushState.flushedFiles);
+            dwpt.pushDeletes(newSegment, indexWriter.segmentInfos);
+            anythingFlushed = true;
+            perThreadPool.clearThreadBindings(perThread);
           }
+        } else if (flushDeletes) {
+          dwpt.pushDeletes(null, indexWriter.segmentInfos);
         }
+      } finally {
+        perThread.unlock();
+      }
 
-        if (anythingFlushed) {
-          clearThreadBindings();
-          numDocsInRAM.set(0);
-        }
-
-        return anythingFlushed;
+      if (newSegment != null) {
+        // important do unlock the perThread before finishFlushedSegment
+        // is called to prevent deadlock on IndexWriter mutex
+        finishFlushedSegment(newSegment, flushedFiles);
       }
-    });
+    }
+
+    numDocsInRAM.set(0);
+    return anythingFlushed;
   }
 
   /** Build compound file for the segment we just flushed */
-  void createCompoundFile(String compoundFileName, DocumentsWriterPerThread perThread) throws IOException {
+  void createCompoundFile(String compoundFileName, Collection<String> flushedFiles) throws IOException {
     CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
-    for(String fileName : perThread.flushState.flushedFiles) {
+    for(String fileName : flushedFiles) {
       cfsWriter.addFile(fileName);
     }
 
@@ -481,16 +500,14 @@ final class DocumentsWriter {
     cfsWriter.close();
   }
 
-  void finishFlushedSegment(SegmentInfo newSegment, DocumentsWriterPerThread perThread) throws IOException {
-    perThread.pushDeletes(newSegment, indexWriter.segmentInfos);
-
+  void finishFlushedSegment(SegmentInfo newSegment, Collection<String> flushedFiles) throws IOException {
     if (indexWriter.useCompoundFile(newSegment)) {
       String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
       message("creating compound file " + compoundFileName);
       // Now build compound file
       boolean success = false;
       try {
-        createCompoundFile(compoundFileName, perThread);
+        createCompoundFile(compoundFileName, flushedFiles);
         success = true;
       } finally {
         if (!success) {
@@ -501,14 +518,14 @@ final class DocumentsWriter {
 
           indexWriter.deleter.deleteFile(IndexFileNames.segmentFileName(newSegment.name, "",
               IndexFileNames.COMPOUND_FILE_EXTENSION));
-          for (String file : perThread.flushState.flushedFiles) {
+          for (String file : flushedFiles) {
             indexWriter.deleter.deleteFile(file);
           }
 
         }
       }
 
-      for (String file : perThread.flushState.flushedFiles) {
+      for (String file : flushedFiles) {
         indexWriter.deleter.deleteFile(file);
       }
 

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1058461&r1=1058460&r2=1058461&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Thu Jan 13 08:49:21 2011
@@ -109,7 +109,7 @@ public class DocumentsWriterPerThread {
    *  currently buffered docs.  This resets our state,
    *  discarding any docs added since last flush. */
   void abort() throws IOException {
-    assert aborting;
+    aborting = true;
     try {
       if (infoStream != null) {
         message("docWriter: now abort");
@@ -152,7 +152,6 @@ public class DocumentsWriterPerThread {
   FieldInfos fieldInfos = new FieldInfos();
 
   public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
-    parent.indexWriter.testPoint("DocumentsWriterPerThread.init start");
     this.directory = directory;
     this.parent = parent;
     this.writer = parent.indexWriter;
@@ -191,7 +190,7 @@ public class DocumentsWriterPerThread {
         if (!aborting) {
           // mark document as deleted
           deleteDocID(docState.docID);
-          commitDocument();
+          numDocsInRAM++;
         }
       }
     }
@@ -203,7 +202,7 @@ public class DocumentsWriterPerThread {
       success = true;
     } finally {
       if (!success) {
-        setAborting();
+        abort();
       }
     }
   }
@@ -249,23 +248,23 @@ public class DocumentsWriterPerThread {
     // confounding exception).
   }
 
-  void deleteQueries(Query... queries) {
+  synchronized void deleteQueries(Query... queries) {
     for (Query query : queries) {
       pendingDeletes.addQuery(query, numDocsInRAM);
     }
   }
 
-  void deleteQuery(Query query) {
+  synchronized void deleteQuery(Query query) {
     pendingDeletes.addQuery(query, numDocsInRAM);
   }
 
-  void deleteTerms(Term... terms) {
+  synchronized void deleteTerms(Term... terms) {
     for (Term term : terms) {
       pendingDeletes.addTerm(term, numDocsInRAM);
     }
   }
 
-  void deleteTerm(Term term) {
+  synchronized void deleteTerm(Term term) {
     pendingDeletes.addTerm(term, numDocsInRAM);
   }
 
@@ -350,7 +349,7 @@ public class DocumentsWriterPerThread {
       return newSegment;
     } finally {
       if (!success) {
-        setAborting();
+        abort();
       }
     }
   }

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1058461&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Thu Jan 13 08:49:21 2011
@@ -0,0 +1,81 @@
+package org.apache.lucene.index;
+
+import java.util.Iterator;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.document.Document;
+
+public abstract class DocumentsWriterPerThreadPool {
+  final static class ThreadState extends ReentrantLock {
+    final DocumentsWriterPerThread perThread;
+
+    ThreadState(DocumentsWriterPerThread perThread) {
+      this.perThread = perThread;
+    }
+  }
+
+  private final ThreadState[] perThreads;
+  private volatile int numThreadStatesActive;
+
+  public DocumentsWriterPerThreadPool(int maxNumPerThreads) {
+    maxNumPerThreads = (maxNumPerThreads < 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumPerThreads;
+    this.perThreads = new ThreadState[maxNumPerThreads];
+
+    numThreadStatesActive = 0;
+  }
+
+  public void initialize(DocumentsWriter documentsWriter) {
+    for (int i = 0; i < perThreads.length; i++) {
+      perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, documentsWriter.chain));
+    }
+  }
+
+  public int getMaxThreadStates() {
+    return perThreads.length;
+  }
+
+  public ThreadState newThreadState() {
+    if (numThreadStatesActive < perThreads.length) {
+      ThreadState state = perThreads[numThreadStatesActive];
+      numThreadStatesActive++;
+      return state;
+    }
+
+    return null;
+  }
+
+  public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
+
+  public abstract void clearThreadBindings(ThreadState perThread);
+
+  public abstract void clearAllThreadBindings();
+
+  public Iterator<ThreadState> getAllPerThreadsIterator() {
+    return getPerThreadsIterator(this.perThreads.length);
+  }
+
+  public Iterator<ThreadState> getActivePerThreadsIterator() {
+    return getPerThreadsIterator(this.numThreadStatesActive);
+  }
+
+  private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
+    return new Iterator<ThreadState>() {
+      int i = 0;
+
+      @Override
+      public boolean hasNext() {
+        return i < upto;
+      }
+
+      @Override
+      public ThreadState next() {
+        return perThreads[i++];
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException("remove() not supported.");
+      }
+    };
+  }
+}

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1058461&r1=1058460&r2=1058461&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java Thu Jan 13 08:49:21 2011
@@ -1275,7 +1275,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteTerm(term, false)) {
+      if (docWriter.deleteTerm(term)) {
         flush(true, false);
       }
     } catch (OutOfMemoryError oom) {
@@ -1396,10 +1396,11 @@ public class IndexWriter implements Clos
   public void updateDocument(Term term, Document doc, Analyzer analyzer)
       throws CorruptIndexException, IOException {
     ensureOpen();
+    boolean maybeMerge = false;
     try {
       boolean success = false;
       try {
-        docWriter.updateDocument(doc, analyzer, term);
+        maybeMerge = docWriter.updateDocument(doc, analyzer, term);
         success = true;
       } finally {
         if (!success && infoStream != null)
@@ -1408,6 +1409,10 @@ public class IndexWriter implements Clos
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "updateDocument");
     }
+
+    if (maybeMerge) {
+      maybeMerge();
+    }
   }
 
   // for test purpose

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java?rev=1058461&r1=1058460&r2=1058461&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java Thu Jan 13 08:49:21 2011
@@ -31,12 +31,12 @@ import org.apache.lucene.util.Version;
  * <p>
  * All setter methods return {@link IndexWriterConfig} to allow chaining
  * settings conveniently. Thus someone can do:
- * 
+ *
  * <pre>
  * IndexWriterConfig conf = new IndexWriterConfig(analyzer);
  * conf.setter1().setter2();
  * </pre>
- * 
+ *
  * @since 3.1
  */
 public final class IndexWriterConfig implements Cloneable {
@@ -53,7 +53,7 @@ public final class IndexWriterConfig imp
    * </ul>
    */
   public static enum OpenMode { CREATE, APPEND, CREATE_OR_APPEND }
-  
+
   /** Default value is 32. Change using {@link #setTermIndexInterval(int)}. */
   public static final int DEFAULT_TERM_INDEX_INTERVAL = 32;
 
@@ -74,7 +74,7 @@ public final class IndexWriterConfig imp
 
   /**
    * Default value for the write lock timeout (1,000 ms).
-   * 
+   *
    * @see #setDefaultWriteLockTimeout(long)
    */
   public static long WRITE_LOCK_TIMEOUT = 1000;
@@ -102,7 +102,7 @@ public final class IndexWriterConfig imp
   /**
    * Returns the default write lock timeout for newly instantiated
    * IndexWriterConfigs.
-   * 
+   *
    * @see #setDefaultWriteLockTimeout(long)
    */
   public static long getDefaultWriteLockTimeout() {
@@ -126,9 +126,9 @@ public final class IndexWriterConfig imp
   private CodecProvider codecProvider;
   private MergePolicy mergePolicy;
   private boolean readerPooling;
-  private DocumentsWriterThreadPool indexerThreadPool;
+  private DocumentsWriterPerThreadPool indexerThreadPool;
   private int readerTermsIndexDivisor;
-  
+
   // required for clone
   private Version matchVersion;
 
@@ -161,7 +161,7 @@ public final class IndexWriterConfig imp
     indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(DEFAULT_MAX_THREAD_STATES);
     readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR;
   }
-  
+
   @Override
   public Object clone() {
     // Shallow clone is the only thing that's possible, since parameters like
@@ -184,7 +184,7 @@ public final class IndexWriterConfig imp
     this.openMode = openMode;
     return this;
   }
-  
+
   /** Returns the {@link OpenMode} set by {@link #setOpenMode(OpenMode)}. */
   public OpenMode getOpenMode() {
     return openMode;
@@ -243,7 +243,7 @@ public final class IndexWriterConfig imp
   /**
    * Returns the maximum number of terms that will be indexed for a single field
    * in a document.
-   * 
+   *
    * @see #setMaxFieldLength(int)
    */
   public int getMaxFieldLength() {
@@ -273,7 +273,7 @@ public final class IndexWriterConfig imp
    * <p>
    * <b>NOTE:</b> the similarity cannot be null. If <code>null</code> is passed,
    * the similarity will be set to the default.
-   * 
+   *
    * @see Similarity#setDefault(Similarity)
    */
   public IndexWriterConfig setSimilarity(Similarity similarity) {
@@ -289,7 +289,7 @@ public final class IndexWriterConfig imp
   public Similarity getSimilarity() {
     return similarity;
   }
-  
+
   /**
    * Expert: set the interval between indexed terms. Large values cause less
    * memory to be used by IndexReader, but slow random-access to terms. Small
@@ -309,7 +309,7 @@ public final class IndexWriterConfig imp
    * In particular, <code>numUniqueTerms/interval</code> terms are read into
    * memory by an IndexReader, and, on average, <code>interval/2</code> terms
    * must be scanned for each random term access.
-   * 
+   *
    * @see #DEFAULT_TERM_INDEX_INTERVAL
    */
   public IndexWriterConfig setTermIndexInterval(int interval) {
@@ -319,7 +319,7 @@ public final class IndexWriterConfig imp
 
   /**
    * Returns the interval between indexed terms.
-   * 
+   *
    * @see #setTermIndexInterval(int)
    */
   public int getTermIndexInterval() {
@@ -355,10 +355,10 @@ public final class IndexWriterConfig imp
     this.writeLockTimeout = writeLockTimeout;
     return this;
   }
-  
+
   /**
    * Returns allowed timeout when acquiring the write lock.
-   * 
+   *
    * @see #setWriteLockTimeout(long)
    */
   public long getWriteLockTimeout() {
@@ -372,7 +372,7 @@ public final class IndexWriterConfig imp
    * created.
 
    * <p>Disabled by default (writer flushes by RAM usage).
-   * 
+   *
    * @throws IllegalArgumentException if maxBufferedDeleteTerms
    * is enabled but smaller than 1
    * @see #setRAMBufferSizeMB
@@ -389,7 +389,7 @@ public final class IndexWriterConfig imp
   /**
    * Returns the number of buffered deleted terms that will trigger a flush if
    * enabled.
-   * 
+   *
    * @see #setMaxBufferedDeleteTerms(int)
    */
   public int getMaxBufferedDeleteTerms() {
@@ -401,33 +401,33 @@ public final class IndexWriterConfig imp
    * and deletions before they are flushed to the Directory. Generally for
    * faster indexing performance it's best to flush by RAM usage instead of
    * document count and use as large a RAM buffer as you can.
-   * 
+   *
    * <p>
    * When this is set, the writer will flush whenever buffered documents and
    * deletions use this much RAM. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent
    * triggering a flush due to RAM usage. Note that if flushing by document
    * count is also enabled, then the flush will be triggered by whichever comes
    * first.
-   * 
+   *
    * <p>
    * <b>NOTE</b>: the account of RAM usage for pending deletions is only
    * approximate. Specifically, if you delete by Query, Lucene currently has no
    * way to measure the RAM usage of individual Queries so the accounting will
    * under-estimate and you should compensate by either calling commit()
    * periodically yourself, or by using {@link #setMaxBufferedDeleteTerms(int)}
-   * to flush by count instead of RAM usage (each buffered delete Query counts 
+   * to flush by count instead of RAM usage (each buffered delete Query counts
    * as one).
-   * 
+   *
    * <p>
    * <b>NOTE</b>: because IndexWriter uses <code>int</code>s when managing its
    * internal storage, the absolute maximum value for this setting is somewhat
    * less than 2048 MB. The precise limit depends on various factors, such as
    * how large your documents are, how many fields have norms, etc., so it's
    * best to set this value comfortably under 2048.
-   * 
+   *
    * <p>
    * The default value is {@link #DEFAULT_RAM_BUFFER_SIZE_MB}.
-   * 
+   *
    * @throws IllegalArgumentException
    *           if ramBufferSize is enabled but non-positive, or it disables
    *           ramBufferSize when maxBufferedDocs is already disabled
@@ -456,19 +456,19 @@ public final class IndexWriterConfig imp
    * Determines the minimal number of documents required before the buffered
    * in-memory documents are flushed as a new Segment. Large values generally
    * give faster indexing.
-   * 
+   *
    * <p>
    * When this is set, the writer will flush every maxBufferedDocs added
    * documents. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent triggering a
    * flush due to number of buffered documents. Note that if flushing by RAM
    * usage is also enabled, then the flush will be triggered by whichever comes
    * first.
-   * 
+   *
    * <p>
    * Disabled by default (writer flushes by RAM usage).
-   * 
+   *
    * @see #setRAMBufferSizeMB(double)
-   * 
+   *
    * @throws IllegalArgumentException
    *           if maxBufferedDocs is enabled but smaller than 2, or it disables
    *           maxBufferedDocs when ramBufferSize is already disabled
@@ -488,7 +488,7 @@ public final class IndexWriterConfig imp
   /**
    * Returns the number of buffered added documents that will trigger a flush if
    * enabled.
-   * 
+   *
    * @see #setMaxBufferedDocs(int)
    */
   public int getMaxBufferedDocs() {
@@ -529,10 +529,10 @@ public final class IndexWriterConfig imp
     return codecProvider;
   }
 
-  
+
   /**
    * Returns the current MergePolicy in use by this writer.
-   * 
+   *
    * @see #setMergePolicy(MergePolicy)
    */
   public MergePolicy getMergePolicy() {
@@ -545,15 +545,15 @@ public final class IndexWriterConfig imp
    * <code>maxThreadStates</code> will be set to
    * {@link #DEFAULT_MAX_THREAD_STATES}.
    */
-  public IndexWriterConfig setIndexerThreadPool(DocumentsWriterThreadPool threadPool) {
+  public IndexWriterConfig setIndexerThreadPool(DocumentsWriterPerThreadPool threadPool) {
     this.indexerThreadPool = threadPool;
     return this;
   }
 
-  public DocumentsWriterThreadPool getIndexerThreadPool() {
+  public DocumentsWriterPerThreadPool getIndexerThreadPool() {
     return this.indexerThreadPool;
   }
-  
+
   /** Returns the max number of simultaneous threads that
    *  may be indexing documents at once in IndexWriter. */
   public int getMaxThreadStates() {
@@ -584,7 +584,7 @@ public final class IndexWriterConfig imp
     this.indexingChain = indexingChain == null ? DocumentsWriterPerThread.defaultIndexingChain : indexingChain;
     return this;
   }
-  
+
   /** Returns the indexing chain set on {@link #setIndexingChain(IndexingChain)}. */
   IndexingChain getIndexingChain() {
     return indexingChain;
@@ -606,7 +606,7 @@ public final class IndexWriterConfig imp
   public int getReaderTermsIndexDivisor() {
     return readerTermsIndexDivisor;
   }
-  
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java?rev=1058461&r1=1058460&r2=1058461&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Thu Jan 13 08:49:21 2011
@@ -1,66 +1,56 @@
 package org.apache.lucene.index;
 
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.lucene.document.Document;
 
-public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterThreadPool {
-  private static final class AffinityThreadState extends ThreadState {
-    int numAssignedThreads;
-
-    @Override
-    void finish() {
-      numAssignedThreads--;
-    }
-  }
+public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool {
+  private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread, ThreadState>();
 
-  private Map<Thread, AffinityThreadState> threadBindings = new HashMap<Thread, AffinityThreadState>();
-
-  public ThreadAffinityDocumentsWriterThreadPool(int maxNumThreadStates) {
-    super(maxNumThreadStates);
+  public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
+    super(maxNumPerThreads);
   }
 
   @Override
-  protected ThreadState selectThreadState(Thread requestingThread, DocumentsWriter documentsWriter, Document doc) {
-    AffinityThreadState threadState = threadBindings.get(requestingThread);
-    // First, find a thread state.  If this thread already
-    // has affinity to a specific ThreadState, use that one
-    // again.
-    if (threadState == null) {
-      AffinityThreadState minThreadState = null;
-      for(int i=0;i<allThreadStates.length;i++) {
-        AffinityThreadState ts = (AffinityThreadState) allThreadStates[i];
-        if (minThreadState == null || ts.numAssignedThreads < minThreadState.numAssignedThreads)
-          minThreadState = ts;
+  public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc) {
+    ThreadState threadState = threadBindings.get(requestingThread);
+    if (threadState != null) {
+      if (threadState.tryLock()) {
+        return threadState;
       }
-      if (minThreadState != null && (minThreadState.numAssignedThreads == 0 || allThreadStates.length >= maxNumThreadStates)) {
-        threadState = minThreadState;
-      } else {
-        threadState = addNewThreadState(documentsWriter, new AffinityThreadState());
+    }
+
+    // find the state that has minimum amount of threads waiting
+    Iterator<ThreadState> it = getActivePerThreadsIterator();
+    ThreadState minThreadState = null;
+    while (it.hasNext()) {
+      ThreadState state = it.next();
+      if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
+        minThreadState = state;
       }
-      threadBindings.put(requestingThread, threadState);
     }
-    threadState.numAssignedThreads++;
 
-    return threadState;
+    if (minThreadState == null || minThreadState.hasQueuedThreads()) {
+      ThreadState newState = newThreadState();
+      if (newState != null) {
+        minThreadState = newState;
+        threadBindings.put(requestingThread, newState);
+      }
+    }
+
+    minThreadState.lock();
+    return minThreadState;
   }
 
   @Override
-  protected void clearThreadBindings(ThreadState flushedThread) {
-    Iterator<Entry<Thread, AffinityThreadState>> it = threadBindings.entrySet().iterator();
-    while (it.hasNext()) {
-      Entry<Thread, AffinityThreadState> e = it.next();
-      if (e.getValue() == flushedThread) {
-        it.remove();
-      }
-    }
+  public void clearThreadBindings(ThreadState perThread) {
+    threadBindings.clear();
   }
 
   @Override
-  protected void clearAllThreadBindings() {
+  public void clearAllThreadBindings() {
     threadBindings.clear();
   }
 }