You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/02/12 16:44:41 UTC

svn commit: r1243268 - in /lucene/dev/trunk/lucene/core/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/

Author: simonw
Date: Sun Feb 12 15:44:41 2012
New Revision: 1243268

URL: http://svn.apache.org/viewvc?rev=1243268&view=rev
Log:
LUCENE-3773: minor improvements to DWPTThreadPool

Modified:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1243268&r1=1243267&r2=1243268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Sun Feb 12 15:44:41 2012
@@ -19,7 +19,6 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -213,18 +212,18 @@ final class DocumentsWriter {
         infoStream.message("DW", "abort");
       }
 
-      final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
-      while (threadsIterator.hasNext()) {
-        final ThreadState perThread = threadsIterator.next();
+      final int limit = perThreadPool.getActiveThreadState();
+      for (int i = 0; i < limit; i++) {
+        final ThreadState perThread = perThreadPool.getThreadState(i);
         perThread.lock();
         try {
           if (perThread.isActive()) { // we might be closed
             try {
-              perThread.perThread.abort();
+              perThread.dwpt.abort();
             } catch (IOException ex) {
               // continue
             } finally {
-              perThread.perThread.checkAndResetHasAborted();
+              perThread.dwpt.checkAndResetHasAborted();
               flushControl.doOnAbort(perThread);
             }
           } else {
@@ -338,7 +337,7 @@ final class DocumentsWriter {
         assert false: "perThread is not active but we are still open";
       }
        
-      final DocumentsWriterPerThread dwpt = perThread.perThread;
+      final DocumentsWriterPerThread dwpt = perThread.dwpt;
       try {
         final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm);
         numDocsInRAM.addAndGet(docCount);
@@ -372,7 +371,7 @@ final class DocumentsWriter {
         assert false: "perThread is not active but we are still open";
       }
        
-      final DocumentsWriterPerThread dwpt = perThread.perThread;
+      final DocumentsWriterPerThread dwpt = perThread.dwpt;
       try {
         dwpt.updateDocument(doc, analyzer, delTerm); 
         numDocsInRAM.incrementAndGet();
@@ -587,22 +586,4 @@ final class DocumentsWriter {
     }
     
   }
-
-  
- 
-  
-  // use by IW during close to assert all DWPT are inactive after final flush
-  boolean assertNoActiveDWPT() {
-    Iterator<ThreadState> activePerThreadsIterator = perThreadPool.getAllPerThreadsIterator();
-    while(activePerThreadsIterator.hasNext()) {
-      ThreadState next = activePerThreadsIterator.next();
-      next.lock();
-      try {
-        assert !next.isActive();
-      } finally {
-        next.unlock();
-      }
-    }
-    return true;
-  }
 }

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1243268&r1=1243267&r2=1243268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Sun Feb 12 15:44:41 2012
@@ -18,8 +18,8 @@ package org.apache.lucene.index;
  */
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.IdentityHashMap;
 import java.util.List;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
@@ -51,6 +51,8 @@ public final class DocumentsWriterFlushC
   private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
   // only for safety reasons if a DWPT is close to the RAM limit
   private final Queue<BlockedFlush> blockedFlushes = new LinkedList<BlockedFlush>();
+  private final IdentityHashMap<DocumentsWriterPerThread, Long> flushingWriters = new IdentityHashMap<DocumentsWriterPerThread, Long>();
+
 
   double maxConfiguredRamBuffer = 0;
   long peakActiveBytes = 0;// only with assert
@@ -61,7 +63,6 @@ public final class DocumentsWriterFlushC
   private final DocumentsWriterPerThreadPool perThreadPool;
   private final FlushPolicy flushPolicy;
   private boolean closed = false;
-  private final HashMap<DocumentsWriterPerThread, Long> flushingWriters = new HashMap<DocumentsWriterPerThread, Long>();
   private final DocumentsWriter documentsWriter;
   private final IndexWriterConfig config;
 
@@ -122,7 +123,7 @@ public final class DocumentsWriterFlushC
   }
 
   private void commitPerThreadBytes(ThreadState perThread) {
-    final long delta = perThread.perThread.bytesUsed()
+    final long delta = perThread.dwpt.bytesUsed()
         - perThread.bytesUsed;
     perThread.bytesUsed += delta;
     /*
@@ -212,7 +213,7 @@ public final class DocumentsWriterFlushC
    */
   public synchronized void setFlushPending(ThreadState perThread) {
     assert !perThread.flushPending;
-    if (perThread.perThread.getNumDocsInRAM() > 0) {
+    if (perThread.dwpt.getNumDocsInRAM() > 0) {
       perThread.flushPending = true; // write access synced
       final long bytes = perThread.bytesUsed;
       flushBytes += bytes;
@@ -295,18 +296,21 @@ public final class DocumentsWriterFlushC
   }
 
   DocumentsWriterPerThread nextPendingFlush() {
+    int numPending;
+    boolean fullFlush;
     synchronized (this) {
       final DocumentsWriterPerThread poll;
       if ((poll = flushQueue.poll()) != null) {
         stallControl.updateStalled(this);
         return poll;
       }
+      fullFlush = this.fullFlush;
+      numPending = this.numPending;
     }
     if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush
-      final Iterator<ThreadState> allActiveThreads = perThreadPool
-          .getActivePerThreadsIterator();
-      while (allActiveThreads.hasNext() && numPending > 0) {
-        ThreadState next = allActiveThreads.next();
+      final int limit = perThreadPool.getActiveThreadState();
+      for (int i = 0; i < limit && numPending > 0; i++) {
+        final ThreadState next = perThreadPool.getThreadState(i);
         if (next.flushPending) {
           final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next);
           if (dwpt != null) {
@@ -327,9 +331,29 @@ public final class DocumentsWriterFlushC
   /**
    * Returns an iterator that provides access to all currently active {@link ThreadState}s 
    */
-  public Iterator<ThreadState> allActiveThreads() {
-    return perThreadPool.getActivePerThreadsIterator();
+  public Iterator<ThreadState> allActiveThreadStates() {
+    return getPerThreadsIterator(perThreadPool.getActiveThreadState());
   }
+  
+  private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
+    return new Iterator<ThreadState>() {
+      int i = 0;
+
+      public boolean hasNext() {
+        return i < upto;
+      }
+
+      public ThreadState next() {
+        return perThreadPool.getThreadState(i++);
+      }
+
+      public void remove() {
+        throw new UnsupportedOperationException("remove() not supported.");
+      }
+    };
+  }
+
+  
 
   synchronized void doOnDelete() {
     // pass null this is a global delete no update
@@ -369,7 +393,7 @@ public final class DocumentsWriterFlushC
     boolean success = false;
     try {
       if (perThread.isActive()
-          && perThread.perThread.deleteQueue != documentsWriter.deleteQueue) {
+          && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
         // There is a flush-all in process and this DWPT is
         // now stale -- enroll it for flush and try for
         // another DWPT:
@@ -397,23 +421,23 @@ public final class DocumentsWriterFlushC
       DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
       documentsWriter.deleteQueue = newQueue;
     }
-    final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
-    while (allActiveThreads.hasNext()) {
-      final ThreadState next = allActiveThreads.next();
+    final int limit = perThreadPool.getActiveThreadState();
+    for (int i = 0; i < limit; i++) {
+      final ThreadState next = perThreadPool.getThreadState(i);
       next.lock();
       try {
         if (!next.isActive()) {
           continue; 
         }
-        assert next.perThread.deleteQueue == flushingQueue
-            || next.perThread.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: "
+        assert next.dwpt.deleteQueue == flushingQueue
+            || next.dwpt.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: "
             + flushingQueue
             + " currentqueue: "
             + documentsWriter.deleteQueue
             + " perThread queue: "
-            + next.perThread.deleteQueue
-            + " numDocsInRam: " + next.perThread.getNumDocsInRAM();
-        if (next.perThread.deleteQueue != flushingQueue) {
+            + next.dwpt.deleteQueue
+            + " numDocsInRam: " + next.dwpt.getNumDocsInRAM();
+        if (next.dwpt.deleteQueue != flushingQueue) {
           // this one is already a new DWPT
           continue;
         }
@@ -437,12 +461,12 @@ public final class DocumentsWriterFlushC
   }
   
   private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
-    final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
-    while (allActiveThreads.hasNext()) {
-      final ThreadState next = allActiveThreads.next();
+    final int limit = perThreadPool.getActiveThreadState();
+    for (int i = 0; i < limit; i++) {
+      final ThreadState next = perThreadPool.getThreadState(i);
       next.lock();
       try {
-        assert !next.isActive() || next.perThread.deleteQueue == queue;
+        assert !next.isActive() || next.dwpt.deleteQueue == queue;
       } finally {
         next.unlock();
       }
@@ -454,9 +478,9 @@ public final class DocumentsWriterFlushC
 
   void addFlushableState(ThreadState perThread) {
     if (documentsWriter.infoStream.isEnabled("DWFC")) {
-      documentsWriter.infoStream.message("DWFC", Thread.currentThread().getName() + ": addFlushableState " + perThread.perThread);
+      documentsWriter.infoStream.message("DWFC", Thread.currentThread().getName() + ": addFlushableState " + perThread.dwpt);
     }
-    final DocumentsWriterPerThread dwpt = perThread.perThread;
+    final DocumentsWriterPerThread dwpt = perThread.dwpt;
     assert perThread.isHeldByCurrentThread();
     assert perThread.isActive();
     assert fullFlush;
@@ -473,9 +497,9 @@ public final class DocumentsWriterFlushC
       }
     } else {
       if (closed) {
-        perThread.resetWriter(null); // make this state inactive
+        perThreadPool.deactivateThreadState(perThread); // make this state inactive
       } else {
-        dwpt.initialize();
+        perThreadPool.reinitThreadState(perThread);
       }
     }
   }
@@ -597,4 +621,6 @@ public final class DocumentsWriterFlushC
   boolean anyStalledThreads() {
     return stallControl.anyStalledThreads();
   }
+  
+  
 }

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1243268&r1=1243267&r2=1243268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Sun Feb 12 15:44:41 2012
@@ -16,7 +16,6 @@ package org.apache.lucene.index;
  * limitations under the License.
  */
 
-import java.util.Iterator;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
@@ -38,11 +37,6 @@ import org.apache.lucene.util.SetOnce;
  * </p>
  */
 public abstract class DocumentsWriterPerThreadPool {
-  /** The maximum number of simultaneous threads that may be
-   *  indexing documents at once in IndexWriter; if more
-   *  than this many threads arrive they will wait for
-   *  others to finish. */
-  public final static int DEFAULT_MAX_THREAD_STATES = 8;
   
   /**
    * {@link ThreadState} references and guards a
@@ -57,17 +51,18 @@ public abstract class DocumentsWriterPer
    */
   @SuppressWarnings("serial")
   public final static class ThreadState extends ReentrantLock {
-    // package private for FlushPolicy
-    DocumentsWriterPerThread perThread;
+    DocumentsWriterPerThread dwpt;
+    // TODO this should really be part of DocumentsWriterFlushControl
     // write access guarded by DocumentsWriterFlushControl
     volatile boolean flushPending = false;
+    // TODO this should really be part of DocumentsWriterFlushControl
     // write access guarded by DocumentsWriterFlushControl
     long bytesUsed = 0;
     // guarded by Reentrant lock
     private boolean isActive = true;
 
-    ThreadState(DocumentsWriterPerThread perThread) {
-      this.perThread = perThread;
+    ThreadState(DocumentsWriterPerThread dpwt) {
+      this.dwpt = dpwt;
     }
     
     /**
@@ -76,12 +71,12 @@ public abstract class DocumentsWriterPer
      * for indexing anymore.
      * @see #isActive()  
      */
-    void resetWriter(DocumentsWriterPerThread perThread) {
+    private void resetWriter(DocumentsWriterPerThread dwpt) {
       assert this.isHeldByCurrentThread();
-      if (perThread == null) {
+      if (dwpt == null) {
         isActive = false;
       }
-      this.perThread = perThread;
+      this.dwpt = dwpt;
       this.bytesUsed = 0;
       this.flushPending = false;
     }
@@ -112,7 +107,7 @@ public abstract class DocumentsWriterPer
     public DocumentsWriterPerThread getDocumentsWriterPerThread() {
       assert this.isHeldByCurrentThread();
       // public for FlushPolicy
-      return perThread;
+      return dwpt;
     }
     
     /**
@@ -124,40 +119,37 @@ public abstract class DocumentsWriterPer
     }
   }
 
-  private final ThreadState[] perThreads;
+  private final ThreadState[] threadStates;
   private volatile int numThreadStatesActive;
-  private FieldNumberBiMap globalFieldMap;
+  private final SetOnce<FieldNumberBiMap> globalFieldMap = new SetOnce<FieldNumberBiMap>();
   private final SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>();
   
   /**
-   * Creates a new {@link DocumentsWriterPerThreadPool} with max.
-   * {@link #DEFAULT_MAX_THREAD_STATES} thread states.
+   * Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
    */
-  public DocumentsWriterPerThreadPool() {
-    this(DEFAULT_MAX_THREAD_STATES);
-  }
-
-  public DocumentsWriterPerThreadPool(int maxNumPerThreads) {
-    maxNumPerThreads = (maxNumPerThreads < 1) ? DEFAULT_MAX_THREAD_STATES : maxNumPerThreads;
-    perThreads = new ThreadState[maxNumPerThreads];
+  public DocumentsWriterPerThreadPool(int maxNumThreadStates) {
+    if (maxNumThreadStates < 1) {
+      throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates);
+    }
+    threadStates = new ThreadState[maxNumThreadStates];
     numThreadStatesActive = 0;
   }
 
   public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) {
     this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
-    this.globalFieldMap = globalFieldMap;
-    for (int i = 0; i < perThreads.length; i++) {
+    this.globalFieldMap.set(globalFieldMap);
+    for (int i = 0; i < threadStates.length; i++) {
       final FieldInfos infos = new FieldInfos(globalFieldMap);
-      perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
+      threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
     }
   }
-
+  
   /**
    * Returns the max number of {@link ThreadState} instances available in this
    * {@link DocumentsWriterPerThreadPool}
    */
   public int getMaxThreadStates() {
-    return perThreads.length;
+    return threadStates.length;
   }
   
   /**
@@ -178,16 +170,16 @@ public abstract class DocumentsWriterPer
    *         <code>null</code>
    */
   public synchronized ThreadState newThreadState() {
-    if (numThreadStatesActive < perThreads.length) {
-      final ThreadState threadState = perThreads[numThreadStatesActive];
+    if (numThreadStatesActive < threadStates.length) {
+      final ThreadState threadState = threadStates[numThreadStatesActive];
       threadState.lock(); // lock so nobody else will get this ThreadState
       boolean unlock = true;
       try {
         if (threadState.isActive()) {
           // unreleased thread states are deactivated during DW#close()
           numThreadStatesActive++; // increment will publish the ThreadState
-          assert threadState.perThread != null;
-          threadState.perThread.initialize();
+          assert threadState.dwpt != null;
+          threadState.dwpt.initialize();
           unlock = false;
           return threadState;
         }
@@ -205,12 +197,12 @@ public abstract class DocumentsWriterPer
   }
   
   private synchronized boolean assertUnreleasedThreadStatesInactive() {
-    for (int i = numThreadStatesActive; i < perThreads.length; i++) {
-      assert perThreads[i].tryLock() : "unreleased threadstate should not be locked";
+    for (int i = numThreadStatesActive; i < threadStates.length; i++) {
+      assert threadStates[i].tryLock() : "unreleased threadstate should not be locked";
       try {
-        assert !perThreads[i].isActive() : "expected unreleased thread state to be inactive";
+        assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive";
       } finally {
-        perThreads[i].unlock();
+        threadStates[i].unlock();
       }
     }
     return true;
@@ -220,8 +212,8 @@ public abstract class DocumentsWriterPer
    * Deactivate all unreleased threadstates 
    */
   protected synchronized void deactivateUnreleasedStates() {
-    for (int i = numThreadStatesActive; i < perThreads.length; i++) {
-      final ThreadState threadState = perThreads[i];
+    for (int i = numThreadStatesActive; i < threadStates.length; i++) {
+      final ThreadState threadState = threadStates[i];
       threadState.lock();
       try {
         threadState.resetWriter(null);
@@ -233,9 +225,10 @@ public abstract class DocumentsWriterPer
   
   protected DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
     assert threadState.isHeldByCurrentThread();
-    final DocumentsWriterPerThread dwpt = threadState.perThread;
+    assert globalFieldMap.get() != null;
+    final DocumentsWriterPerThread dwpt = threadState.dwpt;
     if (!closed) {
-      final FieldInfos infos = new FieldInfos(globalFieldMap);
+      final FieldInfos infos = new FieldInfos(globalFieldMap.get());
       final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos);
       newDwpt.initialize();
       threadState.resetWriter(newDwpt);
@@ -251,45 +244,19 @@ public abstract class DocumentsWriterPer
   
   public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter);
 
+  
   /**
-   * Returns an iterator providing access to all {@link ThreadState}
-   * instances. 
-   */
-  // TODO: new Iterator per indexed doc is overkill...?
-  public Iterator<ThreadState> getAllPerThreadsIterator() {
-    return getPerThreadsIterator(this.perThreads.length);
-  }
-
-  /**
-   * Returns an iterator providing access to all active {@link ThreadState}
-   * instances. 
-   * <p>
-   * Note: The returned iterator will only iterator
-   * {@link ThreadState}s that are active at the point in time when this method
-   * has been called.
+   * Returns the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
+   * given ord.
    * 
+   * @param ord
+   *          the ordinal of the {@link ThreadState}
+   * @return the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
+   *         given ord.
    */
-  // TODO: new Iterator per indexed doc is overkill...?
-  public Iterator<ThreadState> getActivePerThreadsIterator() {
-    return getPerThreadsIterator(numThreadStatesActive);
-  }
-
-  private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
-    return new Iterator<ThreadState>() {
-      int i = 0;
-
-      public boolean hasNext() {
-        return i < upto;
-      }
-
-      public ThreadState next() {
-        return perThreads[i++];
-      }
-
-      public void remove() {
-        throw new UnsupportedOperationException("remove() not supported.");
-      }
-    };
+  ThreadState getThreadState(int ord) {
+    assert ord < numThreadStatesActive;
+    return threadStates[ord];
   }
 
   /**
@@ -299,14 +266,59 @@ public abstract class DocumentsWriterPer
    */
   protected ThreadState minContendedThreadState() {
     ThreadState minThreadState = null;
-    // TODO: new Iterator per indexed doc is overkill...?
-    final Iterator<ThreadState> it = getActivePerThreadsIterator();
-    while (it.hasNext()) {
-      final ThreadState state = it.next();
+    final int limit = numThreadStatesActive;
+    for (int i = 0; i < limit; i++) {
+      final ThreadState state = threadStates[i];
       if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
         minThreadState = state;
       }
     }
     return minThreadState;
   }
+  
+  /**
+   * Returns the number of currently deactivated {@link ThreadState} instances.
+   * A deactivated {@link ThreadState} should not be used for indexing anymore.
+   * 
+   * @return the number of currently deactivated {@link ThreadState} instances.
+   */
+  int numDeactivatedThreadStates() {
+    int count = 0;
+    for (int i = 0; i < threadStates.length; i++) {
+      final ThreadState threadState = threadStates[i];
+      threadState.lock();
+      try {
+       if (!threadState.isActive) {
+         count++;
+       }
+      } finally {
+        threadState.unlock();
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Deactivates an active {@link ThreadState}. Inactive {@link ThreadState} can
+   * not be used for indexing anymore once they are deactivated. This method should only be used
+   * if the parent {@link DocumentsWriter} is closed or aborted.
+   * 
+   * @param threadState the state to deactivate
+   */
+  void deactivateThreadState(ThreadState threadState) {
+    assert threadState.isActive();
+    threadState.resetWriter(null);
+  }
+
+  /**
+   * Reinitialized an active {@link ThreadState}. A {@link ThreadState} should
+   * only be reinitialized if it is active without any pending documents.
+   * 
+   * @param threadState the state to reinitialize
+   */
+  void reinitThreadState(ThreadState threadState) {
+    assert threadState.isActive;
+    assert threadState.dwpt.getNumDocsInRAM() == 0;
+    threadState.dwpt.initialize();
+  }
 }

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java?rev=1243268&r1=1243267&r2=1243268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java Sun Feb 12 15:44:41 2012
@@ -72,7 +72,7 @@ public class FlushByRamOrCountsPolicy ex
   @Override
   public void onInsert(DocumentsWriterFlushControl control, ThreadState state) {
     if (flushOnDocCount()
-        && state.perThread.getNumDocsInRAM() >= indexWriterConfig
+        && state.dwpt.getNumDocsInRAM() >= indexWriterConfig
             .getMaxBufferedDocs()) {
       // Flush this state by num docs
       control.setFlushPending(state);

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java?rev=1243268&r1=1243267&r2=1243268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java Sun Feb 12 15:44:41 2012
@@ -75,9 +75,7 @@ public abstract class FlushPolicy {
    */
   public void onUpdate(DocumentsWriterFlushControl control, ThreadState state) {
     onInsert(control, state);
-    if (!state.flushPending) {
-      onDelete(control, state);
-    }
+    onDelete(control, state);
   }
 
   /**
@@ -107,17 +105,17 @@ public abstract class FlushPolicy {
    */
   protected ThreadState findLargestNonPendingWriter(
       DocumentsWriterFlushControl control, ThreadState perThreadState) {
-    assert perThreadState.perThread.getNumDocsInRAM() > 0;
+    assert perThreadState.dwpt.getNumDocsInRAM() > 0;
     long maxRamSoFar = perThreadState.bytesUsed;
     // the dwpt which needs to be flushed eventually
     ThreadState maxRamUsingThreadState = perThreadState;
     assert !perThreadState.flushPending : "DWPT should have flushed";
-    Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreads();
+    Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreadStates();
     while (activePerThreadsIterator.hasNext()) {
       ThreadState next = activePerThreadsIterator.next();
       if (!next.flushPending) {
         final long nextRam = next.bytesUsed;
-        if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) {
+        if (nextRam > maxRamSoFar && next.dwpt.getNumDocsInRAM() > 0) {
           maxRamSoFar = nextRam;
           maxRamUsingThreadState = next;
         }

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1243268&r1=1243267&r2=1243268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Sun Feb 12 15:44:41 2012
@@ -1144,7 +1144,7 @@ public class IndexWriter implements Clos
       synchronized(this) {
         closed = true;
       }
-      assert oldWriter.assertNoActiveDWPT();
+      assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates();
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "closeInternal");
     } finally {

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java?rev=1243268&r1=1243267&r2=1243268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java Sun Feb 12 15:44:41 2012
@@ -94,6 +94,13 @@ public final class IndexWriterConfig imp
 
   /** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */
   public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945;
+  
+  /** The maximum number of simultaneous threads that may be
+   *  indexing documents at once in IndexWriter; if more
+   *  than this many threads arrive they will wait for
+   *  others to finish. Default value is 8. */
+  public final static int DEFAULT_MAX_THREAD_STATES = 8;
+  
   /**
    * Sets the default (for any instance) maximum time to wait for a write lock
    * (in milliseconds).
@@ -172,7 +179,7 @@ public final class IndexWriterConfig imp
     }
     flushPolicy = new FlushByRamOrCountsPolicy();
     readerPooling = DEFAULT_READER_POOLING;
-    indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool();
+    indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(DEFAULT_MAX_THREAD_STATES);
     readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR;
     perThreadHardLimitMB = DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
   }
@@ -554,8 +561,8 @@ public final class IndexWriterConfig imp
    * IndexWriter to assign thread-states to incoming indexing threads. If no
    * {@link DocumentsWriterPerThreadPool} is set {@link IndexWriter} will use
    * {@link ThreadAffinityDocumentsWriterThreadPool} with max number of
-   * thread-states set to {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES} (see
-   * {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES}).
+   * thread-states set to {@link #DEFAULT_MAX_THREAD_STATES} (see
+   * {@link #DEFAULT_MAX_THREAD_STATES}).
    * </p>
    * <p>
    * NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java?rev=1243268&r1=1243267&r2=1243268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Sun Feb 12 15:44:41 2012
@@ -34,13 +34,8 @@ public class ThreadAffinityDocumentsWrit
   private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread, ThreadState>();
   
   /**
-   * Creates a new {@link DocumentsWriterPerThreadPool} with max.
-   * {@link #DEFAULT_MAX_THREAD_STATES} thread states.
+   * Creates a new {@link ThreadAffinityDocumentsWriterThreadPool} with a given maximum of {@link ThreadState}s.
    */
-  public ThreadAffinityDocumentsWriterThreadPool() {
-    this(DEFAULT_MAX_THREAD_STATES);
-  }
-  
   public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
     super(maxNumPerThreads);
     assert getMaxThreadStates() >= 1;

Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java?rev=1243268&r1=1243267&r2=1243268&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java Sun Feb 12 15:44:41 2012
@@ -281,10 +281,10 @@ public class TestFlushByRamOrCountsPolic
   }
 
   protected void assertActiveBytesAfter(DocumentsWriterFlushControl flushControl) {
-    Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreads();
+    Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates();
     long bytesUsed = 0;
     while (allActiveThreads.hasNext()) {
-      bytesUsed += allActiveThreads.next().perThread.bytesUsed();
+      bytesUsed += allActiveThreads.next().dwpt.bytesUsed();
     }
     assertEquals(bytesUsed, flushControl.activeBytes());
   }
@@ -343,7 +343,7 @@ public class TestFlushByRamOrCountsPolic
       if (state.flushPending) {
         toFlush = state;
       } else if (flushOnDeleteTerms()
-          && state.perThread.pendingDeletes.numTermDeletes.get() >= indexWriterConfig
+          && state.dwpt.pendingDeletes.numTermDeletes.get() >= indexWriterConfig
               .getMaxBufferedDeleteTerms()) {
         toFlush = state;
       } else {
@@ -376,7 +376,7 @@ public class TestFlushByRamOrCountsPolic
       if (state.flushPending) {
         toFlush = state;
       } else if (flushOnDocCount()
-          && state.perThread.getNumDocsInRAM() >= indexWriterConfig
+          && state.dwpt.getNumDocsInRAM() >= indexWriterConfig
               .getMaxBufferedDocs()) {
         toFlush = state;
       } else if (flushOnRAM()
@@ -397,7 +397,7 @@ public class TestFlushByRamOrCountsPolic
         hasMarkedPending = true;
       } else {
         peakBytesWithoutFlush = Math.max(activeBytes, peakBytesWithoutFlush);
-        peakDocCountWithoutFlush = Math.max(state.perThread.getNumDocsInRAM(),
+        peakDocCountWithoutFlush = Math.max(state.dwpt.getNumDocsInRAM(),
             peakDocCountWithoutFlush);
       }
 
@@ -409,7 +409,7 @@ public class TestFlushByRamOrCountsPolic
 
   static void findPending(DocumentsWriterFlushControl flushControl,
       ArrayList<ThreadState> pending, ArrayList<ThreadState> notPending) {
-    Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreads();
+    Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates();
     while (allActiveThreads.hasNext()) {
       ThreadState next = allActiveThreads.next();
       if (next.flushPending) {