You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/04/11 10:39:44 UTC

[lucene-solr] branch branch_8x updated: LUCENE-9304: Refactor DWPTPool to pool DWPT directly (#1397)

This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new 83b34d4  LUCENE-9304: Refactor DWPTPool to pool DWPT directly (#1397)
83b34d4 is described below

commit 83b34d47900febbd8b5e337a1277b5696c536f78
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Sat Apr 11 12:23:46 2020 +0200

    LUCENE-9304: Refactor DWPTPool to pool DWPT directly (#1397)
    
    This change removes the ThreadState indirection from DWPTPool and pools DWPT directly. The tracking information and locking semantics are mostly moved to DWPT directly and the pool semantics have changed slightly such that DWPT need to be checked-out in the pool once they need to be flushed or aborted. This automatically grows and shrinks the number of DWPT in the system when number of threads grow or shrink. Access of pooled DWPTs is more straight forward and doesn't require ordinal. [...]
    This allowed for removal of indirections in DWPTFlushControl like BlockedFlush, the removal of DWPTPool setter and getter in IndexWriterConfig and the addition of stronger assertions in DWPT and DW.
---
 lucene/CHANGES.txt                                 |   7 +
 .../org/apache/lucene/index/DocumentsWriter.java   | 176 ++++-----
 .../lucene/index/DocumentsWriterFlushControl.java  | 432 +++++++++------------
 .../lucene/index/DocumentsWriterPerThread.java     |  86 +++-
 .../lucene/index/DocumentsWriterPerThreadPool.java | 291 ++++++--------
 .../lucene/index/DocumentsWriterStallControl.java  |   5 +-
 .../lucene/index/FlushByRamOrCountsPolicy.java     |  26 +-
 .../java/org/apache/lucene/index/FlushPolicy.java  |  38 +-
 .../java/org/apache/lucene/index/IndexWriter.java  |   3 +-
 .../org/apache/lucene/index/IndexWriterConfig.java |  22 --
 .../apache/lucene/index/LiveIndexWriterConfig.java |  16 -
 .../lucene/index/TestFlushByRamOrCountsPolicy.java |  82 ++--
 .../org/apache/lucene/index/TestIndexWriter.java   |  76 ++--
 .../apache/lucene/index/TestIndexWriterConfig.java |   1 -
 .../apache/lucene/index/TestIndexWriterDelete.java |   4 +-
 .../lucene/index/TestIndexWriterOnDiskFull.java    |   4 +-
 .../lucene/index/TestIndexWriterWithThreads.java   |   2 +-
 .../org/apache/lucene/index/RandomIndexWriter.java |   4 +-
 .../apache/lucene/store/MockDirectoryWrapper.java  |  18 +-
 19 files changed, 604 insertions(+), 689 deletions(-)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 1875ca0..e3694d3 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -10,6 +10,10 @@ API Changes
 
 * LUCENE-9265: SimpleFSDirectory is deprecated in favor of NIOFSDirectory. (Yannick Welsch)
 
+* LUCENE-9304: Removed ability to set DocumentsWriterPerThreadPool on IndexWriterConfig.
+  The DocumentsWriterPerThreadPool is a packaged protected final class which made it impossible
+  to customize. (Simon Willnauer)
+
 New Features
 ---------------------
 (No changes)
@@ -24,6 +28,9 @@ Improvements
 * LUCENE-8050: PerFieldDocValuesFormat should not get the DocValuesFormat on a field that has no doc values.
   (David Smiley, Juan Rodriguez)
 
+* LUCENE-9304: Removed ThreadState abstraction from DocumentsWriter which allows pooling of DWPT directly and
+  improves the approachability of the IndexWriter code. (Simon Willnauer)
+
 Optimizations
 ---------------------
 
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index aa4016c..965c10d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -32,7 +32,6 @@ import java.util.function.ToLongFunction;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
-import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
@@ -59,21 +58,25 @@ import org.apache.lucene.util.InfoStream;
  * Threads:
  *
  * Multiple threads are allowed into addDocument at once.
- * There is an initial synchronized call to getThreadState
- * which allocates a ThreadState for this thread.  The same
- * thread will get the same ThreadState over time (thread
- * affinity) so that if there are consistent patterns (for
- * example each thread is indexing a different content
- * source) then we make better use of RAM.  Then
- * processDocument is called on that ThreadState without
+ * There is an initial synchronized call to
+ * {@link DocumentsWriterFlushControl#obtainAndLock()}
+ * which allocates a DWPT for this indexing thread. The same
+ * thread will not necessarily get the same DWPT over time.
+ * Then updateDocuments is called on that DWPT without
  * synchronization (most of the "heavy lifting" is in this
- * call).  Finally the synchronized "finishDocument" is
- * called to flush changes to the directory.
+ * call). Once a DWPT fills up enough RAM or hold enough
+ * documents in memory the DWPT is checked out for flush
+ * and all changes are written to the directory. Each DWPT
+ * corresponds to one segment being written.
  *
- * When flush is called by IndexWriter we forcefully idle
- * all threads and flush only once they are all idle.  This
- * means you can call flush with a given thread even while
- * other threads are actively adding/deleting documents.
+ * When flush is called by IndexWriter we check out all DWPTs
+ * that are associated with the current {@link DocumentsWriterDeleteQueue}
+ * out of the {@link DocumentsWriterPerThreadPool} and write
+ * them to disk. The flush process can piggy-back on incoming
+ * indexing threads or even block them from adding documents
+ * if flushing can't keep up with new documents being added.
+ * Unless the stall control kicks in to block indexing threads
+ * flushes are happening concurrently to actual index requests.
  *
  *
  * Exceptions:
@@ -99,13 +102,8 @@ import org.apache.lucene.util.InfoStream;
  */
 
 final class DocumentsWriter implements Closeable, Accountable {
-  private final Directory directoryOrig; // no wrapping, for infos
-  private final Directory directory;
-  private final FieldInfos.FieldNumbers globalFieldNumberMap;
-  private final int indexCreatedVersionMajor;
   private final AtomicLong pendingNumDocs;
-  private final boolean enableTestPoints;
-  private final Supplier<String> segmentNameSupplier;
+
   private final FlushNotifications flushNotifications;
 
   private volatile boolean closed;
@@ -130,24 +128,23 @@ final class DocumentsWriter implements Closeable, Accountable {
   final DocumentsWriterPerThreadPool perThreadPool;
   final FlushPolicy flushPolicy;
   final DocumentsWriterFlushControl flushControl;
-  private long lastSeqNo;
-  
+
   DocumentsWriter(FlushNotifications flushNotifications, int indexCreatedVersionMajor, AtomicLong pendingNumDocs, boolean enableTestPoints,
                   Supplier<String> segmentNameSupplier, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory,
                   FieldInfos.FieldNumbers globalFieldNumberMap) {
-    this.indexCreatedVersionMajor = indexCreatedVersionMajor;
-    this.directoryOrig = directoryOrig;
-    this.directory = directory;
     this.config = config;
     this.infoStream = config.getInfoStream();
     this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream);
-    this.perThreadPool = config.getIndexerThreadPool();
+    this.perThreadPool = new DocumentsWriterPerThreadPool(() -> {
+      final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
+      return new DocumentsWriterPerThread(indexCreatedVersionMajor,
+          segmentNameSupplier.get(), directoryOrig,
+          directory, config, infoStream, deleteQueue, infos,
+          pendingNumDocs, enableTestPoints);
+    });
     flushPolicy = config.getFlushPolicy();
-    this.globalFieldNumberMap = globalFieldNumberMap;
     this.pendingNumDocs = pendingNumDocs;
     flushControl = new DocumentsWriterFlushControl(this, config);
-    this.segmentNameSupplier = segmentNameSupplier;
-    this.enableTestPoints = enableTestPoints;
     this.flushNotifications = flushNotifications;
   }
   
@@ -155,9 +152,6 @@ final class DocumentsWriter implements Closeable, Accountable {
     return applyDeleteOrUpdate(q -> q.addDelete(queries));
   }
 
-  void setLastSeqNo(long seqNo) {
-    lastSeqNo = seqNo;
-  }
 
   long deleteTerms(final Term... terms) throws IOException {
     return applyDeleteOrUpdate(q -> q.addDelete(terms));
@@ -173,13 +167,12 @@ final class DocumentsWriter implements Closeable, Accountable {
     final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
     long seqNo = function.applyAsLong(deleteQueue);
     flushControl.doOnDelete();
-    lastSeqNo = Math.max(lastSeqNo, seqNo);
     if (applyAllDeletes()) {
       seqNo = -seqNo;
     }
     return seqNo;
   }
-  
+
   /** If buffered deletes are using too much heap, resolve them and write disk and return true. */
   private boolean applyAllDeletes() throws IOException {
     final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
@@ -225,20 +218,23 @@ final class DocumentsWriter implements Closeable, Accountable {
       if (infoStream.isEnabled("DW")) {
         infoStream.message("DW", "abort");
       }
-      final int limit = perThreadPool.getActiveThreadStateCount();
-      for (int i = 0; i < limit; i++) {
-        final ThreadState perThread = perThreadPool.getThreadState(i);
-        perThread.lock();
+      for (final DocumentsWriterPerThread perThread : perThreadPool.filterAndLock(x -> true)) {
         try {
-          abortThreadState(perThread);
+          abortDocumentsWriterPerThread(perThread);
         } finally {
           perThread.unlock();
         }
       }
       flushControl.abortPendingFlushes();
       flushControl.waitForFlush();
+      assert perThreadPool.size() == 0
+          : "There are still active DWPT in the pool: " + perThreadPool.size();
       success = true;
     } finally {
+      if (success) {
+        assert flushControl.getFlushingBytes() == 0 : "flushingBytes has unexpected value 0 != " + flushControl.getFlushingBytes();
+        assert flushControl.netBytes() == 0 : "netBytes has unexpected value 0 != " + flushControl.netBytes();
+      }
       if (infoStream.isEnabled("DW")) {
         infoStream.message("DW", "done abort success=" + success);
       }
@@ -273,33 +269,34 @@ final class DocumentsWriter implements Closeable, Accountable {
         pendingNumDocs.addAndGet(-ticket.getFlushedSegment().segmentInfo.info.maxDoc());
       }
     });
-    List<ThreadState> threadStates = new ArrayList<>();
+    List<DocumentsWriterPerThread> writers = new ArrayList<>();
     AtomicBoolean released = new AtomicBoolean(false);
     final Closeable release = () -> {
+      // we return this closure to unlock all writers once done
+      // or if hit an exception below in the try block.
+      // we can't assign this later otherwise the ref can't be final
       if (released.compareAndSet(false, true)) { // only once
         if (infoStream.isEnabled("DW")) {
           infoStream.message("DW", "unlockAllAbortedThread");
         }
-        perThreadPool.unlockNewThreadStates();
-        for (ThreadState state : threadStates) {
-          state.unlock();
+        perThreadPool.unlockNewWriters();
+        for (DocumentsWriterPerThread writer : writers) {
+          writer.unlock();
         }
       }
     };
     try {
       deleteQueue.clear();
-      perThreadPool.lockNewThreadStates();
-      final int limit = perThreadPool.getMaxThreadStates();
-      for (int i = 0; i < limit; i++) {
-        final ThreadState perThread = perThreadPool.getThreadState(i);
-        perThread.lock();
-        threadStates.add(perThread);
-        abortThreadState(perThread);
+      perThreadPool.lockNewWriters();
+      writers.addAll(perThreadPool.filterAndLock(x -> true));
+      for (final DocumentsWriterPerThread perThread : writers) {
+        assert perThread.isHeldByCurrentThread();
+        abortDocumentsWriterPerThread(perThread);
       }
       deleteQueue.clear();
 
       // jump over any possible in flight ops:
-      deleteQueue.skipSequenceNumbers(perThreadPool.getActiveThreadStateCount() + 1);
+      deleteQueue.skipSequenceNumbers(perThreadPool.size() + 1);
 
       flushControl.abortPendingFlushes();
       flushControl.waitForFlush();
@@ -322,35 +319,22 @@ final class DocumentsWriter implements Closeable, Accountable {
   }
   
   /** Returns how many documents were aborted. */
-  private int abortThreadState(final ThreadState perThread) throws IOException {
+  private void abortDocumentsWriterPerThread(final DocumentsWriterPerThread perThread) throws IOException {
     assert perThread.isHeldByCurrentThread();
-    if (perThread.isInitialized()) { 
-      try {
-        int abortedDocCount = perThread.dwpt.getNumDocsInRAM();
-        subtractFlushedNumDocs(abortedDocCount);
-        perThread.dwpt.abort();
-        return abortedDocCount;
-      } finally {
-        flushControl.doOnAbort(perThread);
-      }
-    } else {
+    try {
+      subtractFlushedNumDocs(perThread.getNumDocsInRAM());
+      perThread.abort();
+    } finally {
       flushControl.doOnAbort(perThread);
-      // This DWPT was never initialized so it has no indexed documents:
-      return 0;
     }
   }
 
   /** returns the maximum sequence number for all previously completed operations */
-  public long getMaxCompletedSequenceNumber() {
-    long value = lastSeqNo;
-    int limit = perThreadPool.getMaxThreadStates();
-    for(int i = 0; i < limit; i++) {
-      ThreadState perThread = perThreadPool.getThreadState(i);
-      value = Math.max(value, perThread.lastSeqNo);
-    }
-    return value;
+  long getMaxCompletedSequenceNumber() {
+    return deleteQueue.getLastSequenceNumber();
   }
 
+
   boolean anyChanges() {
     /*
      * changes are either in a DWPT or in the deleteQueue.
@@ -369,23 +353,23 @@ final class DocumentsWriter implements Closeable, Accountable {
     return anyChanges;
   }
   
-  public int getBufferedDeleteTermsSize() {
+  int getBufferedDeleteTermsSize() {
     return deleteQueue.getBufferedUpdatesTermsSize();
   }
 
   //for testing
-  public int getNumBufferedDeleteTerms() {
+  int getNumBufferedDeleteTerms() {
     return deleteQueue.numGlobalTermDeletes();
   }
 
-  public boolean anyDeletions() {
+  boolean anyDeletions() {
     return deleteQueue.anyChanges();
   }
 
   @Override
-  public void close() {
+  public void close() throws IOException {
     closed = true;
-    flushControl.setClosed();
+    IOUtils.close(flushControl, perThreadPool);
   }
 
   private boolean preUpdate() throws IOException {
@@ -421,37 +405,25 @@ final class DocumentsWriter implements Closeable, Accountable {
 
     return hasEvents;
   }
-  
-  private void ensureInitialized(ThreadState state) throws IOException {
-    if (state.dwpt == null) {
-      final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
-      state.dwpt = new DocumentsWriterPerThread(indexCreatedVersionMajor, segmentNameSupplier.get(), directoryOrig,
-                                                directory, config, infoStream, deleteQueue, infos,
-                                                pendingNumDocs, enableTestPoints);
-    }
-  }
 
   long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer,
                        final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
     boolean hasEvents = preUpdate();
 
-    final ThreadState perThread = flushControl.obtainAndLock();
+    final DocumentsWriterPerThread dwpt = flushControl.obtainAndLock();
     final DocumentsWriterPerThread flushingDWPT;
     long seqNo;
 
     try {
-      // This must happen after we've pulled the ThreadState because IW.close
-      // waits for all ThreadStates to be released:
+      // This must happen after we've pulled the DWPT because IW.close
+      // waits for all DWPT to be released:
       ensureOpen();
-      ensureInitialized(perThread);
-      assert perThread.isInitialized();
-      final DocumentsWriterPerThread dwpt = perThread.dwpt;
       final int dwptNumDocs = dwpt.getNumDocsInRAM();
       try {
         seqNo = dwpt.updateDocuments(docs, analyzer, delNode, flushNotifications);
       } finally {
         if (dwpt.isAborted()) {
-          flushControl.doOnAbort(perThread);
+          flushControl.doOnAbort(dwpt);
         }
         // We don't know how many documents were actually
         // counted as indexed, so we must subtract here to
@@ -459,13 +431,14 @@ final class DocumentsWriter implements Closeable, Accountable {
         numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
       }
       final boolean isUpdate = delNode != null && delNode.isDelete();
-      flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
-
-      assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
-      perThread.lastSeqNo = seqNo;
-
+      flushingDWPT = flushControl.doAfterDocument(dwpt, isUpdate);
     } finally {
-      perThreadPool.release(perThread);
+      if (dwpt.isFlushPending() || dwpt.isAborted()) {
+        dwpt.unlock();
+      } else {
+        perThreadPool.marksAsFreeAndUnlock(dwpt);
+      }
+      assert dwpt.isHeldByCurrentThread() == false : "we didn't release the dwpt even on abort";
     }
 
     if (postUpdate(flushingDWPT, hasEvents)) {
@@ -477,6 +450,7 @@ final class DocumentsWriter implements Closeable, Accountable {
   private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
     boolean hasEvents = false;
     while (flushingDWPT != null) {
+      assert flushingDWPT.hasFlushed() == false;
       hasEvents = true;
       boolean success = false;
       DocumentsWriterFlushQueue.FlushTicket ticket = null;
@@ -536,7 +510,7 @@ final class DocumentsWriter implements Closeable, Accountable {
          * Now we are done and try to flush the ticket queue if the head of the
          * queue has already finished the flush.
          */
-        if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadStateCount()) {
+        if (ticketQueue.getTicketCount() >= perThreadPool.size()) {
           // This means there is a backlog: the one
           // thread in innerPurge can't keep up with all
           // other threads flushing segments.  In this case
@@ -727,7 +701,7 @@ final class DocumentsWriter implements Closeable, Accountable {
    *
    * This is a subset of the value returned by {@link #ramBytesUsed()}
    */
-  public long getFlushingBytes() {
+  long getFlushingBytes() {
     return flushControl.getFlushingBytes();
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index dd339f2..2c1a935 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -17,8 +17,9 @@
 package org.apache.lucene.index;
 
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
-import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -26,7 +27,7 @@ import java.util.Locale;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.InfoStream;
 import org.apache.lucene.util.ThreadInterruptedException;
@@ -43,7 +44,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
  * {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address
  * space exhaustion.
  */
-final class DocumentsWriterFlushControl implements Accountable {
+final class DocumentsWriterFlushControl implements Accountable, Closeable {
 
   private final long hardMaxBytesPerDWPT;
   private long activeBytes = 0;
@@ -52,11 +53,18 @@ final class DocumentsWriterFlushControl implements Accountable {
   private int numDocsSinceStalled = 0; // only with assert
   final AtomicBoolean flushDeletes = new AtomicBoolean(false);
   private boolean fullFlush = false;
+  private boolean fullFlushMarkDone = false; // only for assertion that we don't get stale DWPTs from the pool
+  // The flushQueue is used to concurrently distribute DWPTs that are ready to be flushed ie. when a full flush is in
+  // progress. This might be triggered by a commit or NRT refresh. The trigger will only walk all eligible DWPTs and
+  // mark them as flushable putting them in the flushQueue ready for other threads (ie. indexing threads) to help flushing
   private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<>();
   // only for safety reasons if a DWPT is close to the RAM limit
-  private final Queue<BlockedFlush> blockedFlushes = new LinkedList<>();
-  private final IdentityHashMap<DocumentsWriterPerThread, Long> flushingWriters = new IdentityHashMap<>();
-
+  private final Queue<DocumentsWriterPerThread> blockedFlushes = new LinkedList<>();
+  // flushingWriters holds all currently flushing writers. There might be writers in this list that
+  // are also in the flushQueue which means that writers in the flushingWriters list are not necessarily
+  // already actively flushing. They are only in the state of flushing and might be picked up in the future by
+  // polling the flushQueue
+  private final List<DocumentsWriterPerThread> flushingWriters = new ArrayList<>();
 
   double maxConfiguredRamBuffer = 0;
   long peakActiveBytes = 0;// only with assert
@@ -86,11 +94,11 @@ final class DocumentsWriterFlushControl implements Accountable {
     return activeBytes;
   }
 
-  public long getFlushingBytes() {
+  long getFlushingBytes() {
     return flushBytes;
   }
 
-  public synchronized long netBytes() {
+  synchronized long netBytes() {
     return flushBytes + activeBytes;
   }
   
@@ -139,15 +147,14 @@ final class DocumentsWriterFlushControl implements Accountable {
     return true;
   }
 
-  private void commitPerThreadBytes(ThreadState perThread) {
-    final long delta = perThread.dwpt.bytesUsed() - perThread.bytesUsed;
-    perThread.bytesUsed += delta;
+  private synchronized void commitPerThreadBytes(DocumentsWriterPerThread perThread) {
+    final long delta = perThread.commitLastBytesUsed();
     /*
      * We need to differentiate here if we are pending since setFlushPending
      * moves the perThread memory to the flushBytes and we could be set to
      * pending during a delete
      */
-    if (perThread.flushPending) {
+    if (perThread.isFlushPending()) {
       flushBytes += delta;
     } else {
       activeBytes += delta;
@@ -165,16 +172,16 @@ final class DocumentsWriterFlushControl implements Accountable {
     return true;
   }
 
-  synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, boolean isUpdate) {
+  synchronized DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread, boolean isUpdate) {
     try {
       commitPerThreadBytes(perThread);
-      if (!perThread.flushPending) {
+      if (!perThread.isFlushPending()) {
         if (isUpdate) {
           flushPolicy.onUpdate(this, perThread);
         } else {
           flushPolicy.onInsert(this, perThread);
         }
-        if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
+        if (!perThread.isFlushPending() && perThread.bytesUsed() > hardMaxBytesPerDWPT) {
           // Safety check to prevent a single DWPT exceeding its RAM limit. This
           // is super important since we can not address more than 2048 MB per DWPT
           setFlushPending(perThread);
@@ -187,21 +194,24 @@ final class DocumentsWriterFlushControl implements Accountable {
     }
   }
 
-  private DocumentsWriterPerThread checkout(ThreadState perThread, boolean markPending) {
+  private DocumentsWriterPerThread checkout(DocumentsWriterPerThread perThread, boolean markPending) {
+    assert Thread.holdsLock(this);
     if (fullFlush) {
-      if (perThread.flushPending) {
+      if (perThread.isFlushPending()) {
         checkoutAndBlock(perThread);
         return nextPendingFlush();
-      } else {
-        return null;
       }
     } else {
       if (markPending) {
         assert perThread.isFlushPending() == false;
         setFlushPending(perThread);
       }
-      return tryCheckoutForFlush(perThread);
+
+      if (perThread.isFlushPending()) {
+        return checkOutForFlush(perThread);
+      }
     }
+    return null;
   }
   
   private boolean assertNumDocsSinceStalled(boolean stalled) {
@@ -221,11 +231,10 @@ final class DocumentsWriterFlushControl implements Accountable {
   }
 
   synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
-    assert flushingWriters.containsKey(dwpt);
+    assert flushingWriters.contains(dwpt);
     try {
-      Long bytes = flushingWriters.remove(dwpt);
-      flushBytes -= bytes.longValue();
-      perThreadPool.recycle(dwpt);
+      flushingWriters.remove(dwpt);
+      flushBytes -= dwpt.getLastCommittedBytesUsed();
       assert assertMemory();
     } finally {
       try {
@@ -281,15 +290,15 @@ final class DocumentsWriterFlushControl implements Accountable {
   }
 
   /**
-   * Sets flush pending state on the given {@link ThreadState}. The
-   * {@link ThreadState} must have indexed at least on Document and must not be
+   * Sets flush pending state on the given {@link DocumentsWriterPerThread}. The
+   * {@link DocumentsWriterPerThread} must have indexed at least on Document and must not be
    * already pending.
    */
-  public synchronized void setFlushPending(ThreadState perThread) {
-    assert !perThread.flushPending;
-    if (perThread.dwpt.getNumDocsInRAM() > 0) {
-      perThread.flushPending = true; // write access synced
-      final long bytes = perThread.bytesUsed;
+  public synchronized void setFlushPending(DocumentsWriterPerThread perThread) {
+    assert !perThread.isFlushPending();
+    if (perThread.getNumDocsInRAM() > 0) {
+      perThread.setFlushPending(); // write access synced
+      final long bytes = perThread.getLastCommittedBytesUsed();
       flushBytes += bytes;
       activeBytes -= bytes;
       numPending++; // write access synced
@@ -298,70 +307,57 @@ final class DocumentsWriterFlushControl implements Accountable {
     
   }
   
-  synchronized void doOnAbort(ThreadState state) {
+  synchronized void doOnAbort(DocumentsWriterPerThread perThread) {
     try {
-      if (state.flushPending) {
-        flushBytes -= state.bytesUsed;
+      assert perThreadPool.isRegistered(perThread);
+      assert perThread.isHeldByCurrentThread();
+      if (perThread.isFlushPending()) {
+        flushBytes -= perThread.getLastCommittedBytesUsed();
       } else {
-        activeBytes -= state.bytesUsed;
+        activeBytes -= perThread.getLastCommittedBytesUsed();
       }
       assert assertMemory();
       // Take it out of the loop this DWPT is stale
-      perThreadPool.reset(state);
     } finally {
       updateStallState();
+      boolean checkedOut = perThreadPool.checkout(perThread);
+      assert checkedOut;
     }
   }
 
-  synchronized DocumentsWriterPerThread tryCheckoutForFlush(
-      ThreadState perThread) {
-   return perThread.flushPending ? internalTryCheckOutForFlush(perThread) : null;
-  }
-  
-  private void checkoutAndBlock(ThreadState perThread) {
-    perThread.lock();
-    try {
-      assert perThread.flushPending : "can not block non-pending threadstate";
-      assert fullFlush : "can not block if fullFlush == false";
-      final DocumentsWriterPerThread dwpt;
-      final long bytes = perThread.bytesUsed;
-      dwpt = perThreadPool.reset(perThread);
-      numPending--;
-      blockedFlushes.add(new BlockedFlush(dwpt, bytes));
-    } finally {
-      perThread.unlock();
-    }
+  private void checkoutAndBlock(DocumentsWriterPerThread perThread) {
+    assert perThreadPool.isRegistered(perThread);
+    assert perThread.isHeldByCurrentThread();
+    assert perThread.isFlushPending() : "can not block non-pending threadstate";
+    assert fullFlush : "can not block if fullFlush == false";
+    numPending--;
+    blockedFlushes.add(perThread);
+    boolean checkedOut = perThreadPool.checkout(perThread);
+    assert checkedOut;
   }
 
-  private DocumentsWriterPerThread internalTryCheckOutForFlush(ThreadState perThread) {
+  private synchronized DocumentsWriterPerThread checkOutForFlush(DocumentsWriterPerThread perThread) {
     assert Thread.holdsLock(this);
-    assert perThread.flushPending;
+    assert perThread.isFlushPending();
+    assert perThread.isHeldByCurrentThread();
+    assert perThreadPool.isRegistered(perThread);
     try {
-      // We are pending so all memory is already moved to flushBytes
-      if (perThread.tryLock()) {
-        try {
-          if (perThread.isInitialized()) {
-            assert perThread.isHeldByCurrentThread();
-            final DocumentsWriterPerThread dwpt;
-            final long bytes = perThread.bytesUsed; // do that before
-                                                         // replace!
-            dwpt = perThreadPool.reset(perThread);
-            assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
-            // Record the flushing DWPT to reduce flushBytes in doAfterFlush
-            flushingWriters.put(dwpt, Long.valueOf(bytes));
-            numPending--; // write access synced
-            return dwpt;
-          }
-        } finally {
-          perThread.unlock();
-        }
-      }
-      return null;
+        addFlushingDWPT(perThread);
+        numPending--; // write access synced
+        boolean checkedOut = perThreadPool.checkout(perThread);
+        assert checkedOut;
+        return perThread;
     } finally {
       updateStallState();
     }
   }
 
+  private void addFlushingDWPT(DocumentsWriterPerThread perThread) {
+    assert flushingWriters.contains(perThread) == false : "DWPT is already flushing";
+    // Record the flushing DWPT to reduce flushBytes in doAfterFlush
+    flushingWriters.add(perThread);
+  }
+
   @Override
   public String toString() {
     return "DocumentsWriterFlushControl [activeBytes=" + activeBytes
@@ -380,14 +376,17 @@ final class DocumentsWriterFlushControl implements Accountable {
       fullFlush = this.fullFlush;
       numPending = this.numPending;
     }
-    if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush
-      final int limit = perThreadPool.getActiveThreadStateCount();
-      for (int i = 0; i < limit && numPending > 0; i++) {
-        final ThreadState next = perThreadPool.getThreadState(i);
-        if (next.flushPending) {
-          final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next);
-          if (dwpt != null) {
-            return dwpt;
+    if (numPending > 0 && fullFlush == false) { // don't check if we are doing a full flush
+      for (final DocumentsWriterPerThread next : perThreadPool) {
+        if (next.isFlushPending()) {
+          if (next.tryLock()) {
+            try {
+              if (perThreadPool.isRegistered(next)) {
+                return checkOutForFlush(next);
+              }
+            } finally {
+              next.unlock();
+            }
           }
         }
       }
@@ -395,37 +394,17 @@ final class DocumentsWriterFlushControl implements Accountable {
     return null;
   }
 
-  synchronized void setClosed() {
-    // set by DW to signal that we should not release new DWPT after close
-    this.closed = true;
+  @Override
+  public synchronized void close() {
+    // set by DW to signal that we are closing. in this case we try to not stall any threads anymore etc.
+    closed = true;
   }
 
   /**
-   * Returns an iterator that provides access to all currently active {@link ThreadState}s 
+   * Returns an iterator that provides access to all currently active {@link DocumentsWriterPerThread}s
    */
-  public Iterator<ThreadState> allActiveThreadStates() {
-    return getPerThreadsIterator(perThreadPool.getActiveThreadStateCount());
-  }
-  
-  private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
-    return new Iterator<ThreadState>() {
-      int i = 0;
-
-      @Override
-      public boolean hasNext() {
-        return i < upto;
-      }
-
-      @Override
-      public ThreadState next() {
-        return perThreadPool.getThreadState(i++);
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException("remove() not supported.");
-      }
-    };
+  public Iterator<DocumentsWriterPerThread> allActiveWriters() {
+    return perThreadPool.iterator();
   }
 
   synchronized void doOnDelete() {
@@ -458,74 +437,86 @@ final class DocumentsWriterFlushControl implements Accountable {
     flushDeletes.set(true);
   }
   
-  ThreadState obtainAndLock() {
-    final ThreadState perThread = perThreadPool.getAndLock();
-    boolean success = false;
-    try {
-      if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
-        // There is a flush-all in process and this DWPT is
-        // now stale -- enroll it for flush and try for
-        // another DWPT:
-        addFlushableState(perThread);
-      }
-      success = true;
-      // simply return the ThreadState even in a flush all case sine we already hold the lock
-      return perThread;
-    } finally {
-      if (!success) { // make sure we unlock if this fails
-        perThreadPool.release(perThread);
+  DocumentsWriterPerThread obtainAndLock() throws IOException {
+    while (closed == false) {
+      final DocumentsWriterPerThread perThread = perThreadPool.getAndLock();
+      if (perThread.deleteQueue == documentsWriter.deleteQueue) {
+        // simply return the DWPT even in a flush all case since we already hold the lock and the DWPT is not stale
+        // since it has the current delete queue associated with it. This means we have established a happens-before
+        // relationship and all docs indexed into this DWPT are guaranteed to not be flushed with the currently
+        // progress full flush.
+        return perThread;
+      } else {
+        try {
+          // we must first assert otherwise the full flush might make progress once we unlock the dwpt
+          assert fullFlush && fullFlushMarkDone == false :
+              "found a stale DWPT but full flush mark phase is already done fullFlush: "
+                  + fullFlush  + " markDone: " + fullFlushMarkDone;
+        } finally {
+          perThread.unlock();
+          // There is a flush-all in process and this DWPT is
+          // now stale - try another one
+        }
       }
     }
+    throw new AlreadyClosedException("flush control is closed");
   }
   
   long markForFullFlush() {
     final DocumentsWriterDeleteQueue flushingQueue;
     long seqNo;
     synchronized (this) {
-      assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running";
-      assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer;
+      assert fullFlush == false: "called DWFC#markForFullFlush() while full flush is still running";
+      assert fullFlushMarkDone == false : "full flush collection marker is still set to true";
       fullFlush = true;
       flushingQueue = documentsWriter.deleteQueue;
       // Set a new delete queue - all subsequent DWPT will use this queue until
       // we do another full flush
-
-      perThreadPool.lockNewThreadStates(); // no new thread-states while we do a flush otherwise the seqNo accounting might be off
+      perThreadPool.lockNewWriters(); // no new thread-states while we do a flush otherwise the seqNo accounting might be off
       try {
         // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight.  It's fine
         // if we have some sequence numbers that were never assigned:
-        seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
+        seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.size() + 2;
         flushingQueue.maxSeqNo = seqNo + 1;
         DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation + 1, seqNo + 1);
         documentsWriter.deleteQueue = newQueue;
-
       } finally {
-        perThreadPool.unlockNewThreadStates();
+        perThreadPool.unlockNewWriters();
       }
     }
-    final int limit = perThreadPool.getActiveThreadStateCount();
-    for (int i = 0; i < limit; i++) {
-      final ThreadState next = perThreadPool.getThreadState(i);
-      next.lock();
-      try {
-        if (!next.isInitialized()) {
-          continue; 
-        }
-        assert next.dwpt.deleteQueue == flushingQueue
-            || next.dwpt.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: "
-            + flushingQueue
-            + " currentqueue: "
-            + documentsWriter.deleteQueue
-            + " perThread queue: "
-            + next.dwpt.deleteQueue
-            + " numDocsInRam: " + next.dwpt.getNumDocsInRAM();
-        if (next.dwpt.deleteQueue != flushingQueue) {
-          // this one is already a new DWPT
-          continue;
+    final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<>();
+    for (final DocumentsWriterPerThread next : perThreadPool.filterAndLock(dwpt -> dwpt.deleteQueue == flushingQueue)) {
+        try {
+          assert next.deleteQueue == flushingQueue
+              || next.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: "
+              + flushingQueue
+              + " currentqueue: "
+              + documentsWriter.deleteQueue
+              + " perThread queue: "
+              + next.deleteQueue
+              + " numDocsInRam: " + next.getNumDocsInRAM();
+
+          if (next.getNumDocsInRAM() > 0) {
+            final DocumentsWriterPerThread flushingDWPT;
+            synchronized(this) {
+              if (next.isFlushPending() == false) {
+                setFlushPending(next);
+              }
+              flushingDWPT = checkOutForFlush(next);
+            }
+            assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
+            assert next == flushingDWPT : "flushControl returned different DWPT";
+            fullFlushBuffer.add(flushingDWPT);
+          } else {
+            // it's possible that we get a DWPT with 0 docs if we flush concurrently to
+            // threads getting DWPTs from the pool. In this case we simply remove it from
+            // the pool and drop it on the floor.
+            boolean checkout = perThreadPool.checkout(next);
+            assert checkout;
+          }
+        } finally {
+          next.unlock();
         }
-        addFlushableState(next);
-      } finally {
-        next.unlock();
-      }
     }
     synchronized (this) {
       /* make sure we move all DWPT that are where concurrently marked as
@@ -535,67 +526,33 @@ final class DocumentsWriterFlushControl implements Accountable {
       pruneBlockedQueue(flushingQueue);   
       assert assertBlockedFlushes(documentsWriter.deleteQueue);
       flushQueue.addAll(fullFlushBuffer);
-      fullFlushBuffer.clear();
       updateStallState();
+      fullFlushMarkDone = true; // at this point we must have collected all DWPTs that belong to the old delete queue
     }
     assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
     return seqNo;
   }
   
   private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
-    final int limit = perThreadPool.getActiveThreadStateCount();
-    for (int i = 0; i < limit; i++) {
-      final ThreadState next = perThreadPool.getThreadState(i);
-      next.lock();
-      try {
-        assert !next.isInitialized() || next.dwpt.deleteQueue == queue : "isInitialized: " + next.isInitialized() + " numDocs: " + (next.isInitialized() ? next.dwpt.getNumDocsInRAM() : 0) ;
-      } finally {
-        next.unlock();
-      }
+    for (final DocumentsWriterPerThread next : perThreadPool) {
+        assert next.deleteQueue == queue : "numDocs: " + next.getNumDocsInRAM();
     }
     return true;
   }
 
-  private final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<>();
-
-  void addFlushableState(ThreadState perThread) {
-    if (infoStream.isEnabled("DWFC")) {
-      infoStream.message("DWFC", "addFlushableState " + perThread.dwpt);
-    }
-    final DocumentsWriterPerThread dwpt = perThread.dwpt;
-    assert perThread.isHeldByCurrentThread();
-    assert perThread.isInitialized();
-    assert fullFlush;
-    assert dwpt.deleteQueue != documentsWriter.deleteQueue;
-    if (dwpt.getNumDocsInRAM() > 0) {
-      synchronized(this) {
-        if (!perThread.flushPending) {
-          setFlushPending(perThread);
-        }
-        final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread);
-        assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
-        assert dwpt == flushingDWPT : "flushControl returned different DWPT";
-        fullFlushBuffer.add(flushingDWPT);
-      }
-    } else {
-      perThreadPool.reset(perThread); // make this state inactive
-    }
-  }
-  
   /**
-   * Prunes the blockedQueue by removing all DWPT that are associated with the given flush queue. 
+   * Prunes the blockedQueue by removing all DWPTs that are associated with the given flush queue.
    */
   private void pruneBlockedQueue(final DocumentsWriterDeleteQueue flushingQueue) {
-    Iterator<BlockedFlush> iterator = blockedFlushes.iterator();
+    assert Thread.holdsLock(this);
+    Iterator<DocumentsWriterPerThread> iterator = blockedFlushes.iterator();
     while (iterator.hasNext()) {
-      BlockedFlush blockedFlush = iterator.next();
-      if (blockedFlush.dwpt.deleteQueue == flushingQueue) {
+      DocumentsWriterPerThread blockedFlush = iterator.next();
+      if (blockedFlush.deleteQueue == flushingQueue) {
         iterator.remove();
-        assert !flushingWriters.containsKey(blockedFlush.dwpt) : "DWPT is already flushing";
-        // Record the flushing DWPT to reduce flushBytes in doAfterFlush
-        flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
+        addFlushingDWPT(blockedFlush);
         // don't decr pending here - it's already done when DWPT is blocked
-        flushQueue.add(blockedFlush.dwpt);
+        flushQueue.add(blockedFlush);
       }
     }
   }
@@ -611,14 +568,15 @@ final class DocumentsWriterFlushControl implements Accountable {
         assert blockedFlushes.isEmpty();
       }
     } finally {
-      fullFlush = false;
+      fullFlushMarkDone = fullFlush = false;
+
       updateStallState();
     }
   }
   
   boolean assertBlockedFlushes(DocumentsWriterDeleteQueue flushingQueue) {
-    for (BlockedFlush blockedFlush : blockedFlushes) {
-      assert blockedFlush.dwpt.deleteQueue == flushingQueue;
+    for (DocumentsWriterPerThread blockedFlush : blockedFlushes) {
+      assert blockedFlush.deleteQueue == flushingQueue;
     }
     return true;
   }
@@ -627,7 +585,7 @@ final class DocumentsWriterFlushControl implements Accountable {
    try {
      abortPendingFlushes();
    } finally {
-     fullFlush = false;
+     fullFlushMarkDone = fullFlush = false;
    }
   }
   
@@ -643,15 +601,15 @@ final class DocumentsWriterFlushControl implements Accountable {
           doAfterFlush(dwpt);
         }
       }
-      for (BlockedFlush blockedFlush : blockedFlushes) {
+      for (DocumentsWriterPerThread blockedFlush : blockedFlushes) {
         try {
-          flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
-          documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM());
-          blockedFlush.dwpt.abort();
+          addFlushingDWPT(blockedFlush); // add the blockedFlushes for correct accounting in doAfterFlush
+          documentsWriter.subtractFlushedNumDocs(blockedFlush.getNumDocsInRAM());
+          blockedFlush.abort();
         } catch (Exception ex) {
           // that's fine we just abort everything here this is best effort
         } finally {
-          doAfterFlush(blockedFlush.dwpt);
+          doAfterFlush(blockedFlush);
         }
       }
     } finally {
@@ -685,16 +643,6 @@ final class DocumentsWriterFlushControl implements Accountable {
     return blockedFlushes.size();
   }
   
-  private static class BlockedFlush {
-    final DocumentsWriterPerThread dwpt;
-    final long bytes;
-    BlockedFlush(DocumentsWriterPerThread dwpt, long bytes) {
-      super();
-      this.dwpt = dwpt;
-      this.bytes = bytes;
-    }
-  }
-
   /**
    * This method will block if too many DWPT are currently flushing and no
    * checked out DWPT are available
@@ -717,51 +665,45 @@ final class DocumentsWriterFlushControl implements Accountable {
     return infoStream;
   }
 
-  synchronized ThreadState findLargestNonPendingWriter() {
-    ThreadState maxRamUsingThreadState = null;
+  synchronized DocumentsWriterPerThread findLargestNonPendingWriter() {
+    DocumentsWriterPerThread maxRamUsingWriter = null;
     long maxRamSoFar = 0;
-    Iterator<ThreadState> activePerThreadsIterator = allActiveThreadStates();
     int count = 0;
-    while (activePerThreadsIterator.hasNext()) {
-      ThreadState next = activePerThreadsIterator.next();
-      if (!next.flushPending) {
-        final long nextRam = next.bytesUsed;
-        if (nextRam > 0 && next.dwpt.getNumDocsInRAM() > 0) {
-          if (infoStream.isEnabled("FP")) {
-            infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.dwpt.getNumDocsInRAM());
-          }
-          count++;
-          if (nextRam > maxRamSoFar) {
-            maxRamSoFar = nextRam;
-            maxRamUsingThreadState = next;
-          }
+    for (DocumentsWriterPerThread next : perThreadPool) {
+      if (next.isFlushPending() == false && next.getNumDocsInRAM() > 0) {
+        final long nextRam = next.bytesUsed();
+        if (infoStream.isEnabled("FP")) {
+          infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.getNumDocsInRAM());
+        }
+        count++;
+        if (nextRam > maxRamSoFar) {
+          maxRamSoFar = nextRam;
+          maxRamUsingWriter = next;
         }
       }
     }
     if (infoStream.isEnabled("FP")) {
       infoStream.message("FP", count + " in-use non-flushing threads states");
     }
-    return maxRamUsingThreadState;
+    return maxRamUsingWriter;
   }
 
   /**
    * Returns the largest non-pending flushable DWPT or <code>null</code> if there is none.
    */
   final DocumentsWriterPerThread checkoutLargestNonPendingWriter() {
-    ThreadState largestNonPendingWriter = findLargestNonPendingWriter();
+    DocumentsWriterPerThread largestNonPendingWriter = findLargestNonPendingWriter();
     if (largestNonPendingWriter != null) {
       // we only lock this very briefly to swap it's DWPT out - we don't go through the DWPTPool and it's free queue
       largestNonPendingWriter.lock();
       try {
-        synchronized (this) {
-          try {
-            if (largestNonPendingWriter.isInitialized() == false) {
-              return nextPendingFlush();
-            } else {
+        if (perThreadPool.isRegistered(largestNonPendingWriter)) {
+          synchronized (this) {
+            try {
               return checkout(largestNonPendingWriter, largestNonPendingWriter.isFlushPending() == false);
+            } finally {
+              updateStallState();
             }
-          } finally {
-            updateStallState();
           }
         }
       } finally {
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index a5f79af..73b64f0 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -24,6 +24,7 @@ import java.util.Locale;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.codecs.Codec;
@@ -41,6 +42,7 @@ import org.apache.lucene.util.Counter;
 import org.apache.lucene.util.FixedBitSet;
 import org.apache.lucene.util.InfoStream;
 import org.apache.lucene.util.IntBlockPool;
+import org.apache.lucene.util.SetOnce;
 import org.apache.lucene.util.StringHelper;
 import org.apache.lucene.util.Version;
 
@@ -156,6 +158,9 @@ final class DocumentsWriterPerThread {
   final BufferedUpdates pendingUpdates;
   final SegmentInfo segmentInfo;     // Current segment we are working on
   private boolean aborted = false;   // True if we aborted
+  private SetOnce<Boolean> flushPending = new SetOnce<>();
+  private volatile long lastCommittedBytesUsed;
+  private SetOnce<Boolean> hasFlushed = new SetOnce<>();
 
   private final FieldInfos.Builder fieldInfos;
   private final InfoStream infoStream;
@@ -169,6 +174,7 @@ final class DocumentsWriterPerThread {
   private final LiveIndexWriterConfig indexWriterConfig;
   private final boolean enableTestPoints;
   private final int indexVersionCreated;
+  private final ReentrantLock lock = new ReentrantLock();
 
   public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
                                   FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
@@ -350,6 +356,7 @@ final class DocumentsWriterPerThread {
 
   /** Flush all pending docs to a new segment */
   FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
+    assert flushPending.get() == Boolean.TRUE;
     assert numDocsInRAM > 0;
     assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
     segmentInfo.setMaxDoc(numDocsInRAM);
@@ -445,6 +452,7 @@ final class DocumentsWriterPerThread {
       throw t;
     } finally {
       maybeAbort("flush", flushNotifications);
+      hasFlushed.set(Boolean.TRUE);
     }
   }
 
@@ -600,5 +608,81 @@ final class DocumentsWriterPerThread {
       + ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborted=" + aborted + ", numDocsInRAM="
         + numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
   }
-  
+
+
+  /**
+   * Returns true iff this DWPT is marked as flush pending
+   */
+  boolean isFlushPending() {
+    return flushPending.get() == Boolean.TRUE;
+  }
+
+  /**
+   * Sets this DWPT as flush pending. This can only be set once.
+   */
+  void setFlushPending() {
+    flushPending.set(Boolean.TRUE);
+  }
+
+
+  /**
+   * Returns the last committed bytes for this DWPT. This method can be called
+   * without acquiring the DWPTs lock.
+   */
+  long getLastCommittedBytesUsed() {
+    return lastCommittedBytesUsed;
+  }
+
+  /**
+   * Commits the current {@link #bytesUsed()} and stores it's value for later reuse.
+   * The last committed bytes used can be retrieved via {@link #getLastCommittedBytesUsed()}
+   * @return the delta between the current {@link #bytesUsed()} and the current {@link #getLastCommittedBytesUsed()}
+   */
+  long commitLastBytesUsed() {
+    assert isHeldByCurrentThread();
+    long delta = bytesUsed() - lastCommittedBytesUsed;
+    lastCommittedBytesUsed += delta;
+    return delta;
+  }
+
+  /**
+   * Locks this DWPT for exclusive access.
+   * @see ReentrantLock#lock()
+   */
+  void lock() {
+    lock.lock();
+  }
+
+  /**
+   * Acquires the DWPT's lock only if it is not held by another thread at the time
+   * of invocation.
+   * @return true if the lock was acquired.
+   * @see ReentrantLock#tryLock()
+   */
+  boolean tryLock() {
+    return lock.tryLock();
+  }
+
+  /**
+   * Returns true if the DWPT's lock is held by the current thread
+   * @see ReentrantLock#isHeldByCurrentThread()
+   */
+  boolean isHeldByCurrentThread() {
+    return lock.isHeldByCurrentThread();
+  }
+
+  /**
+   * Unlocks the DWPT's lock
+   * @see ReentrantLock#unlock()
+   */
+  void unlock() {
+    lock.unlock();
+  }
+
+  /**
+   * Returns <code>true</code> iff this DWPT has been flushed
+   */
+  boolean hasFlushed() {
+    return hasFlushed.get() == Boolean.TRUE;
+  }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index 7fafd41..4510ad6 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -16,228 +16,177 @@
  */
 package org.apache.lucene.index;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.Set;
+import java.util.function.Predicate;
 
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.util.IOSupplier;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
- * {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances
- * and their thread assignments during indexing. Each {@link ThreadState} holds
- * a reference to a {@link DocumentsWriterPerThread} that is once a
- * {@link ThreadState} is obtained from the pool exclusively used for indexing a
- * single document by the obtaining thread. Each indexing thread must obtain
- * such a {@link ThreadState} to make progress. Depending on the
- * {@link DocumentsWriterPerThreadPool} implementation {@link ThreadState}
+ * {@link DocumentsWriterPerThreadPool} controls {@link DocumentsWriterPerThread} instances
+ * and their thread assignments during indexing. Each {@link DocumentsWriterPerThread} is once a
+ * obtained from the pool exclusively used for indexing a
+ * single document or list of documents by the obtaining thread. Each indexing thread must obtain
+ * such a {@link DocumentsWriterPerThread} to make progress. Depending on the
+ * {@link DocumentsWriterPerThreadPool} implementation {@link DocumentsWriterPerThread}
  * assignments might differ from document to document.
  * <p>
- * Once a {@link DocumentsWriterPerThread} is selected for flush the thread pool
- * is reusing the flushing {@link DocumentsWriterPerThread}s ThreadState with a
- * new {@link DocumentsWriterPerThread} instance.
+ * Once a {@link DocumentsWriterPerThread} is selected for flush the {@link DocumentsWriterPerThread} will
+ * be checked out of the thread pool and won't be reused for indexing. See {@link #checkout(DocumentsWriterPerThread)}.
  * </p>
  */
-final class DocumentsWriterPerThreadPool {
-  
-  /**
-   * {@link ThreadState} references and guards a
-   * {@link DocumentsWriterPerThread} instance that is used during indexing to
-   * build a in-memory index segment. {@link ThreadState} also holds all flush
-   * related per-thread data controlled by {@link DocumentsWriterFlushControl}.
-   * <p>
-   * A {@link ThreadState}, its methods and members should only accessed by one
-   * thread a time. Users must acquire the lock via {@link ThreadState#lock()}
-   * and release the lock in a finally block via {@link ThreadState#unlock()}
-   * before accessing the state.
-   */
-  @SuppressWarnings("serial")
-  final static class ThreadState extends ReentrantLock {
-    DocumentsWriterPerThread dwpt;
-    // TODO this should really be part of DocumentsWriterFlushControl
-    // write access guarded by DocumentsWriterFlushControl
-    volatile boolean flushPending = false;
-    // TODO this should really be part of DocumentsWriterFlushControl
-    // write access guarded by DocumentsWriterFlushControl
-    long bytesUsed = 0;
-
-    // set by DocumentsWriter after each indexing op finishes
-    volatile long lastSeqNo;
-
-    ThreadState(DocumentsWriterPerThread dpwt) {
-      this.dwpt = dpwt;
-    }
-    
-    private void reset() {
-      assert this.isHeldByCurrentThread();
-      this.dwpt = null;
-      this.bytesUsed = 0;
-      this.flushPending = false;
-    }
-    
-    boolean isInitialized() {
-      assert this.isHeldByCurrentThread();
-      return dwpt != null;
-    }
-    
-    /**
-     * Returns the number of currently active bytes in this ThreadState's
-     * {@link DocumentsWriterPerThread}
-     */
-    public long getBytesUsedPerThread() {
-      assert this.isHeldByCurrentThread();
-      // public for FlushPolicy
-      return bytesUsed;
-    }
-    
-    /**
-     * Returns this {@link ThreadState}s {@link DocumentsWriterPerThread}
-     */
-    public DocumentsWriterPerThread getDocumentsWriterPerThread() {
-      assert this.isHeldByCurrentThread();
-      // public for FlushPolicy
-      return dwpt;
-    }
-    
-    /**
-     * Returns <code>true</code> iff this {@link ThreadState} is marked as flush
-     * pending otherwise <code>false</code>
-     */
-    public boolean isFlushPending() {
-      return flushPending;
-    }
-  }
+final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerThread>, Closeable {
 
-  private final List<ThreadState> threadStates = new ArrayList<>();
+  private final Set<DocumentsWriterPerThread> dwpts = Collections.newSetFromMap(new IdentityHashMap<>());
+  private final Deque<DocumentsWriterPerThread> freeList = new ArrayDeque<>();
+  private final IOSupplier<DocumentsWriterPerThread> dwptFactory;
+  private int takenWriterPermits = 0;
+  private boolean closed;
 
-  private final List<ThreadState> freeList = new ArrayList<>();
 
-  private int takenThreadStatePermits = 0;
+  DocumentsWriterPerThreadPool(IOSupplier<DocumentsWriterPerThread> dwptFactory) {
+    this.dwptFactory = dwptFactory;
+  }
 
   /**
-   * Returns the active number of {@link ThreadState} instances.
+   * Returns the active number of {@link DocumentsWriterPerThread} instances.
    */
-  synchronized int getActiveThreadStateCount() {
-    return threadStates.size();
+  synchronized int size() {
+    return dwpts.size();
   }
 
-  synchronized void lockNewThreadStates() {
-    // this is similar to a semaphore - we need to acquire all permits ie. takenThreadStatePermits must be == 0
-    // any call to lockNewThreadStates() must be followed by unlockNewThreadStates() otherwise we will deadlock at some
+  synchronized void lockNewWriters() {
+    // this is similar to a semaphore - we need to acquire all permits ie. takenWriterPermits must be == 0
+    // any call to lockNewWriters() must be followed by unlockNewWriters() otherwise we will deadlock at some
     // point
-    assert takenThreadStatePermits >= 0;
-    takenThreadStatePermits++;
+    assert takenWriterPermits >= 0;
+    takenWriterPermits++;
   }
 
-  synchronized void unlockNewThreadStates() {
-    assert takenThreadStatePermits > 0;
-    takenThreadStatePermits--;
-    if (takenThreadStatePermits == 0) {
+  synchronized void unlockNewWriters() {
+    assert takenWriterPermits > 0;
+    takenWriterPermits--;
+    if (takenWriterPermits == 0) {
       notifyAll();
     }
   }
+
   /**
-   * Returns a new {@link ThreadState} iff any new state is available otherwise
-   * <code>null</code>.
-   * <p>
-   * NOTE: the returned {@link ThreadState} is already locked iff non-
-   * <code>null</code>.
-   * 
-   * @return a new {@link ThreadState} iff any new state is available otherwise
-   *         <code>null</code>
+   * Returns a new already locked {@link DocumentsWriterPerThread}
+   *
+   * @return a new {@link DocumentsWriterPerThread}
    */
-  private synchronized ThreadState newThreadState() {
-    assert takenThreadStatePermits >= 0;
-    while (takenThreadStatePermits > 0) {
-      // we can't create new thread-states while not all permits are available
+  private synchronized DocumentsWriterPerThread newWriter() throws IOException {
+    assert takenWriterPermits >= 0;
+    while (takenWriterPermits > 0) {
+      // we can't create new DWPTs while not all permits are available
       try {
         wait();
       } catch (InterruptedException ie) {
         throw new ThreadInterruptedException(ie);
       }
     }
-    ThreadState threadState = new ThreadState(null);
-    threadState.lock(); // lock so nobody else will get this ThreadState
-    threadStates.add(threadState);
-    return threadState;
-}
-
-  DocumentsWriterPerThread reset(ThreadState threadState) {
-    assert threadState.isHeldByCurrentThread();
-    final DocumentsWriterPerThread dwpt = threadState.dwpt;
-    threadState.reset();
+    DocumentsWriterPerThread dwpt = dwptFactory.get();
+    dwpt.lock(); // lock so nobody else will get this DWPT
+    dwpts.add(dwpt);
     return dwpt;
   }
-  
-  void recycle(DocumentsWriterPerThread dwpt) {
-    // don't recycle DWPT by default
-  }
 
   // TODO: maybe we should try to do load leveling here: we want roughly even numbers
   // of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing
 
-  /** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
-  ThreadState getAndLock() {
-    ThreadState threadState = null;
+  /** This method is used by DocumentsWriter/FlushControl to obtain a DWPT to do an indexing operation (add/updateDocument). */
+  DocumentsWriterPerThread getAndLock() throws IOException {
     synchronized (this) {
-      if (freeList.isEmpty()) {
-        // ThreadState is already locked before return by this method:
-        return newThreadState();
-      } else {
-        // Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a
-        // limited number of thread states:
-        threadState = freeList.remove(freeList.size()-1);
-
-        if (threadState.dwpt == null) {
-          // This thread-state is not initialized, e.g. it
-          // was just flushed. See if we can instead find
-          // another free thread state that already has docs
-          // indexed. This way if incoming thread concurrency
-          // has decreased, we don't leave docs
-          // indefinitely buffered, tying up RAM.  This
-          // will instead get those thread states flushed,
-          // freeing up RAM for larger segment flushes:
-          for(int i=0;i<freeList.size();i++) {
-            ThreadState ts = freeList.get(i);
-            if (ts.dwpt != null) {
-              // Use this one instead, and swap it with
-              // the un-initialized one:
-              freeList.set(i, threadState);
-              threadState = ts;
-              break;
-            }
-          }
+      if (closed) {
+        throw new AlreadyClosedException("DWPTPool is already closed");
+      }
+      // Important that we are LIFO here! This way if number of concurrent indexing threads was once high,
+      // but has now reduced, we only use a limited number of DWPTs. This also guarantees that if we have suddenly
+      // a single thread indexing
+      final Iterator<DocumentsWriterPerThread> descendingIterator = freeList.descendingIterator();
+      while (descendingIterator.hasNext()) {
+        DocumentsWriterPerThread perThread = descendingIterator.next();
+        if (perThread.tryLock()) {
+          descendingIterator.remove();
+          return perThread;
         }
       }
+      // DWPT is already locked before return by this method:
+      return newWriter();
     }
-
-    // This could take time, e.g. if the threadState is [briefly] checked for flushing:
-    threadState.lock();
-
-    return threadState;
   }
 
-  void release(ThreadState state) {
-    state.unlock();
+  void marksAsFreeAndUnlock(DocumentsWriterPerThread state) {
     synchronized (this) {
+      assert dwpts.contains(state) : "we tried to add a DWPT back to the pool but the pool doesn't know aobut this DWPT";
       freeList.add(state);
     }
+    state.unlock();
   }
-  
+
+  @Override
+  public synchronized Iterator<DocumentsWriterPerThread> iterator() {
+    return Arrays.asList(dwpts.toArray(new DocumentsWriterPerThread[dwpts.size()])).iterator(); // copy on read - this is a quick op since num states is low
+  }
+
+  /**
+   * Filters all DWPTs the given predicate applies to and that can be checked out of the pool via
+   * {@link #checkout(DocumentsWriterPerThread)}. All DWPTs returned from this method are already locked
+   * and {@link #isRegistered(DocumentsWriterPerThread)} will return <code>true</code> for all returned DWPTs
+   */
+  List<DocumentsWriterPerThread> filterAndLock(Predicate<DocumentsWriterPerThread> predicate) {
+    List<DocumentsWriterPerThread> list = new ArrayList<>();
+    for (DocumentsWriterPerThread perThread : this) {
+      if (predicate.test(perThread)) {
+        perThread.lock();
+        if (isRegistered(perThread)) {
+          list.add(perThread);
+        } else {
+          // somebody else has taken this DWPT out of the pool.
+          // unlock and let it go
+          perThread.unlock();
+        }
+      }
+    }
+    return Collections.unmodifiableList(list);
+  }
+
+  /**
+   * Removes the given DWPT from the pool unless it's already been removed before.
+   * @return <code>true</code> iff the given DWPT has been removed. Otherwise <code>false</code>
+   */
+  synchronized boolean checkout(DocumentsWriterPerThread perThread) {
+   assert perThread.isHeldByCurrentThread();
+    if (dwpts.remove(perThread)) {
+      freeList.remove(perThread);
+    } else {
+      assert freeList.contains(perThread) == false;
+      return false;
+    }
+    return true;
+  }
+
   /**
-   * Returns the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
-   * given ord.
-   * 
-   * @param ord
-   *          the ordinal of the {@link ThreadState}
-   * @return the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
-   *         given ord.
+   * Returns <code>true</code> if this DWPT is still part of the pool
    */
-  synchronized ThreadState getThreadState(int ord) {
-    return threadStates.get(ord);
+  synchronized boolean isRegistered(DocumentsWriterPerThread perThread) {
+    return dwpts.contains(perThread);
   }
 
-  // TODO: merge this with getActiveThreadStateCount: they are the same!
-  synchronized int getMaxThreadStates() {
-    return threadStates.size();
+  @Override
+  public synchronized void close() {
+    this.closed = true;
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
index c46e3d2..30a9277 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
@@ -19,7 +19,6 @@ package org.apache.lucene.index;
 import java.util.IdentityHashMap;
 import java.util.Map;
 
-import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
@@ -32,9 +31,9 @@ import org.apache.lucene.util.ThreadInterruptedException;
  * <p>
  * To prevent OOM Errors and ensure IndexWriter's stability this class blocks
  * incoming threads from indexing once 2 x number of available
- * {@link ThreadState}s in {@link DocumentsWriterPerThreadPool} is exceeded.
+ * {@link DocumentsWriterPerThread}s in {@link DocumentsWriterPerThreadPool} is exceeded.
  * Once flushing catches up and the number of flushing DWPT is equal or lower
- * than the number of active {@link ThreadState}s threads are released and can
+ * than the number of active {@link DocumentsWriterPerThread}s threads are released and can
  * continue indexing.
  */
 final class DocumentsWriterStallControl {
diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
index 6620aef..ed540a4 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
@@ -17,8 +17,6 @@
 package org.apache.lucene.index;
 
 
-import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
-
 /**
  * Default {@link FlushPolicy} implementation that flushes new segments based on
  * RAM used and document count depending on the IndexWriter's
@@ -27,11 +25,11 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
  * 
  * <ul>
  * <li>
- * {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)}
+ * {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThread)}
  * - applies pending delete operations based on the global number of buffered
  * delete terms if the consumed memory is greater than {@link IndexWriterConfig#getRAMBufferSizeMB()}</li>.
  * <li>
- * {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)}
+ * {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThread)}
  * - flushes either on the number of documents per
  * {@link DocumentsWriterPerThread} (
  * {@link DocumentsWriterPerThread#getNumDocsInRAM()}) or on the global active
@@ -39,11 +37,11 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
  * {@link IndexWriterConfig#getMaxBufferedDocs()} or
  * {@link IndexWriterConfig#getRAMBufferSizeMB()} is enabled respectively</li>
  * <li>
- * {@link #onUpdate(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)}
+ * {@link #onUpdate(DocumentsWriterFlushControl, DocumentsWriterPerThread)}
  * - calls
- * {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)}
+ * {@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThread)}
  * and
- * {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThreadPool.ThreadState)}
+ * {@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThread)}
  * in order</li>
  * </ul>
  * All {@link IndexWriterConfig} settings are used to mark
@@ -58,7 +56,7 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
 class FlushByRamOrCountsPolicy extends FlushPolicy {
 
   @Override
-  public void onDelete(DocumentsWriterFlushControl control, ThreadState state) {
+  public void onDelete(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
     if ((flushOnRAM() && control.getDeleteBytesUsed() > 1024*1024*indexWriterConfig.getRAMBufferSizeMB())) {
       control.setApplyAllDeletes();
       if (infoStream.isEnabled("FP")) {
@@ -68,12 +66,12 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
   }
 
   @Override
-  public void onInsert(DocumentsWriterFlushControl control, ThreadState state) {
+  public void onInsert(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
     if (flushOnDocCount()
-        && state.dwpt.getNumDocsInRAM() >= indexWriterConfig
+        && perThread.getNumDocsInRAM() >= indexWriterConfig
             .getMaxBufferedDocs()) {
       // Flush this state by num docs
-      control.setFlushPending(state);
+      control.setFlushPending(perThread);
     } else if (flushOnRAM()) {// flush by RAM
       final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
       final long totalRam = control.activeBytes() + control.getDeleteBytesUsed();
@@ -81,7 +79,7 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
         if (infoStream.isEnabled("FP")) {
           infoStream.message("FP", "trigger flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit);
         }
-        markLargestWriterPending(control, state, totalRam);
+        markLargestWriterPending(control, perThread);
       }
     }
   }
@@ -91,8 +89,8 @@ class FlushByRamOrCountsPolicy extends FlushPolicy {
    * pending
    */
   protected void markLargestWriterPending(DocumentsWriterFlushControl control,
-      ThreadState perThreadState, final long currentBytesPerThread) {
-    ThreadState largestNonPendingWriter = findLargestNonPendingWriter(control, perThreadState);
+      DocumentsWriterPerThread perThread) {
+    DocumentsWriterPerThread largestNonPendingWriter = findLargestNonPendingWriter(control, perThread);
     if (largestNonPendingWriter != null) {
       control.setFlushPending(largestNonPendingWriter);
     }
diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
index 3f64100..7140249 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
@@ -16,7 +16,6 @@
  */
 package org.apache.lucene.index;
 
-import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.InfoStream;
 
@@ -35,13 +34,12 @@ import org.apache.lucene.util.InfoStream;
  * {@link IndexWriter} consults the provided {@link FlushPolicy} to control the
  * flushing process. The policy is informed for each added or updated document
  * as well as for each delete term. Based on the {@link FlushPolicy}, the
- * information provided via {@link ThreadState} and
+ * information provided via {@link DocumentsWriterPerThread} and
  * {@link DocumentsWriterFlushControl}, the {@link FlushPolicy} decides if a
  * {@link DocumentsWriterPerThread} needs flushing and mark it as flush-pending
  * via {@link DocumentsWriterFlushControl#setFlushPending}, or if deletes need
  * to be applied.
  * 
- * @see ThreadState
  * @see DocumentsWriterFlushControl
  * @see DocumentsWriterPerThread
  * @see IndexWriterConfig#setFlushPolicy(FlushPolicy)
@@ -52,38 +50,38 @@ abstract class FlushPolicy {
 
   /**
    * Called for each delete term. If this is a delete triggered due to an update
-   * the given {@link ThreadState} is non-null.
+   * the given {@link DocumentsWriterPerThread} is non-null.
    * <p>
    * Note: This method is called synchronized on the given
    * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
-   * thread holds the lock on the given {@link ThreadState}
+   * thread holds the lock on the given {@link DocumentsWriterPerThread}
    */
   public abstract void onDelete(DocumentsWriterFlushControl control,
-      ThreadState state);
+                                DocumentsWriterPerThread perThread);
 
   /**
-   * Called for each document update on the given {@link ThreadState}'s
+   * Called for each document update on the given {@link DocumentsWriterPerThread}'s
    * {@link DocumentsWriterPerThread}.
    * <p>
    * Note: This method is called  synchronized on the given
    * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
-   * thread holds the lock on the given {@link ThreadState}
+   * thread holds the lock on the given {@link DocumentsWriterPerThread}
    */
-  public void onUpdate(DocumentsWriterFlushControl control, ThreadState state) {
-    onInsert(control, state);
-    onDelete(control, state);
+  public void onUpdate(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
+    onInsert(control, perThread);
+    onDelete(control, perThread);
   }
 
   /**
-   * Called for each document addition on the given {@link ThreadState}s
+   * Called for each document addition on the given {@link DocumentsWriterPerThread}s
    * {@link DocumentsWriterPerThread}.
    * <p>
    * Note: This method is synchronized by the given
    * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
-   * thread holds the lock on the given {@link ThreadState}
+   * thread holds the lock on the given {@link DocumentsWriterPerThread}
    */
   public abstract void onInsert(DocumentsWriterFlushControl control,
-      ThreadState state);
+                                DocumentsWriterPerThread perThread);
 
   /**
    * Called by DocumentsWriter to initialize the FlushPolicy
@@ -94,18 +92,18 @@ abstract class FlushPolicy {
   }
 
   /**
-   * Returns the current most RAM consuming non-pending {@link ThreadState} with
+   * Returns the current most RAM consuming non-pending {@link DocumentsWriterPerThread} with
    * at least one indexed document.
    * <p>
    * This method will never return <code>null</code>
    */
-  protected ThreadState findLargestNonPendingWriter(
-      DocumentsWriterFlushControl control, ThreadState perThreadState) {
-    assert perThreadState.dwpt.getNumDocsInRAM() > 0;
+  protected DocumentsWriterPerThread findLargestNonPendingWriter(
+      DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
+    assert perThread.getNumDocsInRAM() > 0;
     // the dwpt which needs to be flushed eventually
-    ThreadState maxRamUsingThreadState = control.findLargestNonPendingWriter();
+    DocumentsWriterPerThread maxRamUsingWriter = control.findLargestNonPendingWriter();
     assert assertMessage("set largest ram consuming thread pending on lower watermark");
-    return maxRamUsingThreadState;
+    return maxRamUsingWriter;
   }
   
   private boolean assertMessage(String s) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index d9cf673..ce058e2 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -2482,7 +2482,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
               globalFieldNumberMap.clear();
               success = true;
               long seqNo = docWriter.deleteQueue.getNextSequenceNumber();
-              docWriter.setLastSeqNo(seqNo);
               return seqNo;
             } finally {
               if (success == false) {
@@ -4964,7 +4963,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
   //   finishStartCommit
   //   startCommitMergeDeletes
   //   startMergeInit
-  //   DocumentsWriter.ThreadState.init start
+  //   DocumentsWriterPerThread addDocuments start
   private final void testPoint(String message) {
     if (enableTestPoints) {
       assert infoStream.isEnabled("TP"); // don't enable unless you need them.
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 3edc9fc..4cdc9c0 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -312,28 +312,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
     return mergePolicy;
   }
 
-  /** Expert: Sets the {@link DocumentsWriterPerThreadPool} instance used by the
-   * IndexWriter to assign thread-states to incoming indexing threads.
-   * <p>
-   * NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with
-   * other {@link IndexWriter} instances once it has been initialized / associated with an
-   * {@link IndexWriter}.
-   * </p>
-   * <p>
-   * NOTE: This only takes effect when IndexWriter is first created.</p>*/
-  IndexWriterConfig setIndexerThreadPool(DocumentsWriterPerThreadPool threadPool) {
-    if (threadPool == null) {
-      throw new IllegalArgumentException("threadPool must not be null");
-    }
-    this.indexerThreadPool = threadPool;
-    return this;
-  }
-
-  @Override
-  DocumentsWriterPerThreadPool getIndexerThreadPool() {
-    return indexerThreadPool;
-  }
-
   /** By default, IndexWriter does not pool the
    *  SegmentReaders it must open for deletions and
    *  merging, unless a near-real-time reader has been
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index fb4bba7..1f48acc 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -80,10 +80,6 @@ public class LiveIndexWriterConfig {
   /** {@link MergePolicy} for selecting merges. */
   protected volatile MergePolicy mergePolicy;
 
-  /** {@code DocumentsWriterPerThreadPool} to control how
-   *  threads are allocated to {@code DocumentsWriterPerThread}. */
-  protected volatile DocumentsWriterPerThreadPool indexerThreadPool;
-
   /** True if readers should be pooled. */
   protected volatile boolean readerPooling;
 
@@ -135,7 +131,6 @@ public class LiveIndexWriterConfig {
     mergePolicy = new TieredMergePolicy();
     flushPolicy = new FlushByRamOrCountsPolicy();
     readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
-    indexerThreadPool = new DocumentsWriterPerThreadPool();
     perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
   }
   
@@ -348,16 +343,6 @@ public class LiveIndexWriterConfig {
   }
   
   /**
-   * Returns the configured {@link DocumentsWriterPerThreadPool} instance.
-   * 
-   * @see IndexWriterConfig#setIndexerThreadPool(DocumentsWriterPerThreadPool)
-   * @return the configured {@link DocumentsWriterPerThreadPool} instance.
-   */
-  DocumentsWriterPerThreadPool getIndexerThreadPool() {
-    return indexerThreadPool;
-  }
-
-  /**
    * Returns {@code true} if {@link IndexWriter} should pool readers even if
    * {@link DirectoryReader#open(IndexWriter)} has not been called.
    */
@@ -492,7 +477,6 @@ public class LiveIndexWriterConfig {
     sb.append("codec=").append(getCodec()).append("\n");
     sb.append("infoStream=").append(getInfoStream().getClass().getName()).append("\n");
     sb.append("mergePolicy=").append(getMergePolicy()).append("\n");
-    sb.append("indexerThreadPool=").append(getIndexerThreadPool()).append("\n");
     sb.append("readerPooling=").append(getReaderPooling()).append("\n");
     sb.append("perThreadHardLimitMB=").append(getRAMPerThreadHardLimitMB()).append("\n");
     sb.append("useCompoundFile=").append(getUseCompoundFile()).append("\n");
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
index 1184a88..0a8f3a8 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.MockDirectoryWrapper;
 import org.apache.lucene.util.LineFileDocs;
@@ -70,8 +69,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
 
     IndexWriterConfig iwc = newIndexWriterConfig(analyzer)
                               .setFlushPolicy(flushPolicy);
-    DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
-    iwc.setIndexerThreadPool(threadPool);
     iwc.setRAMBufferSizeMB(maxRamMB);
     iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
     IndexWriter writer = new IndexWriter(dir, iwc);
@@ -125,8 +122,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
       IndexWriterConfig iwc = newIndexWriterConfig(analyzer)
                                 .setFlushPolicy(flushPolicy);
 
-      DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
-      iwc.setIndexerThreadPool(threadPool);
       iwc.setMaxBufferedDocs(2 + atLeast(10));
       iwc.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH);
       IndexWriter writer = new IndexWriter(dir, iwc);
@@ -173,9 +168,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
     MockDefaultFlushPolicy flushPolicy = new MockDefaultFlushPolicy();
     iwc.setFlushPolicy(flushPolicy);
 
-    DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
-    iwc.setIndexerThreadPool(threadPool);
-
     IndexWriter writer = new IndexWriter(dir, iwc);
     flushPolicy = (MockDefaultFlushPolicy) writer.getConfig().getFlushPolicy();
     DocumentsWriter docsWriter = writer.getDocsWriter();
@@ -237,8 +229,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
       FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy();
       iwc.setFlushPolicy(flushPolicy);
       
-      DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool();
-      iwc.setIndexerThreadPool(threadPool);
       // with such a small ram buffer we should be stalled quite quickly
       iwc.setRAMBufferSizeMB(0.25);
       IndexWriter writer = new IndexWriter(dir, iwc);
@@ -273,13 +263,11 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
   }
 
   protected void assertActiveBytesAfter(DocumentsWriterFlushControl flushControl) {
-    Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates();
+    Iterator<DocumentsWriterPerThread> allActiveWriter = flushControl.allActiveWriters();
     long bytesUsed = 0;
-    while (allActiveThreads.hasNext()) {
-      ThreadState next = allActiveThreads.next();
-      if (next.dwpt != null) {
-        bytesUsed += next.dwpt.bytesUsed();
-      }
+    while (allActiveWriter.hasNext()) {
+      DocumentsWriterPerThread next = allActiveWriter.next();
+      bytesUsed += next.bytesUsed();
     }
     assertEquals(bytesUsed, flushControl.activeBytes());
   }
@@ -332,81 +320,81 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
     boolean hasMarkedPending = false;
 
     @Override
-    public void onDelete(DocumentsWriterFlushControl control, ThreadState state) {
-      final ArrayList<ThreadState> pending = new ArrayList<>();
-      final ArrayList<ThreadState> notPending = new ArrayList<>();
+    public void onDelete(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
+      final ArrayList<DocumentsWriterPerThread> pending = new ArrayList<>();
+      final ArrayList<DocumentsWriterPerThread> notPending = new ArrayList<>();
       findPending(control, pending, notPending);
-      final boolean flushCurrent = state.flushPending;
-      final ThreadState toFlush;
-      if (state.flushPending) {
-        toFlush = state;
+      final boolean flushCurrent = perThread.isFlushPending();
+      final DocumentsWriterPerThread toFlush;
+      if (perThread.isFlushPending()) {
+        toFlush = perThread;
       } else {
         toFlush = null;
       }
-      super.onDelete(control, state);
+      super.onDelete(control, perThread);
       if (toFlush != null) {
         if (flushCurrent) {
           assertTrue(pending.remove(toFlush));
         } else {
           assertTrue(notPending.remove(toFlush));
         }
-        assertTrue(toFlush.flushPending);
+        assertTrue(toFlush.isFlushPending());
         hasMarkedPending = true;
       }
 
-      for (ThreadState threadState : notPending) {
-        assertFalse(threadState.flushPending);
+      for (DocumentsWriterPerThread dwpt : notPending) {
+        assertFalse(dwpt.isFlushPending());
       }
     }
 
     @Override
-    public void onInsert(DocumentsWriterFlushControl control, ThreadState state) {
-      final ArrayList<ThreadState> pending = new ArrayList<>();
-      final ArrayList<ThreadState> notPending = new ArrayList<>();
+    public void onInsert(DocumentsWriterFlushControl control, DocumentsWriterPerThread dwpt) {
+      final ArrayList<DocumentsWriterPerThread> pending = new ArrayList<>();
+      final ArrayList<DocumentsWriterPerThread> notPending = new ArrayList<>();
       findPending(control, pending, notPending);
-      final boolean flushCurrent = state.flushPending;
+      final boolean flushCurrent = dwpt.isFlushPending();
       long activeBytes = control.activeBytes();
-      final ThreadState toFlush;
-      if (state.flushPending) {
-        toFlush = state;
+      final DocumentsWriterPerThread toFlush;
+      if (dwpt.isFlushPending()) {
+        toFlush = dwpt;
       } else if (flushOnDocCount()
-          && state.dwpt.getNumDocsInRAM() >= indexWriterConfig
+          && dwpt.getNumDocsInRAM() >= indexWriterConfig
               .getMaxBufferedDocs()) {
-        toFlush = state;
+        toFlush = dwpt;
       } else if (flushOnRAM()
           && activeBytes >= (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024. * 1024.)) {
-        toFlush = findLargestNonPendingWriter(control, state);
-        assertFalse(toFlush.flushPending);
+        toFlush = findLargestNonPendingWriter(control, dwpt);
+        assertFalse(toFlush.isFlushPending());
       } else {
         toFlush = null;
       }
-      super.onInsert(control, state);
+      super.onInsert(control, dwpt);
       if (toFlush != null) {
         if (flushCurrent) {
           assertTrue(pending.remove(toFlush));
         } else {
           assertTrue(notPending.remove(toFlush));
         }
-        assertTrue(toFlush.flushPending);
+        assertTrue(toFlush.isFlushPending());
         hasMarkedPending = true;
       } else {
         peakBytesWithoutFlush = Math.max(activeBytes, peakBytesWithoutFlush);
-        peakDocCountWithoutFlush = Math.max(state.dwpt.getNumDocsInRAM(),
+        peakDocCountWithoutFlush = Math.max(dwpt.getNumDocsInRAM(),
             peakDocCountWithoutFlush);
       }
 
-      for (ThreadState threadState : notPending) {
-        assertFalse(threadState.flushPending);
+      for (DocumentsWriterPerThread perThread : notPending) {
+        assertFalse(perThread.isFlushPending());
       }
     }
   }
 
   static void findPending(DocumentsWriterFlushControl flushControl,
-      ArrayList<ThreadState> pending, ArrayList<ThreadState> notPending) {
-    Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates();
+      ArrayList<DocumentsWriterPerThread> pending, ArrayList<DocumentsWriterPerThread> notPending) {
+    Iterator<DocumentsWriterPerThread> allActiveThreads = flushControl.allActiveWriters();
     while (allActiveThreads.hasNext()) {
-      ThreadState next = allActiveThreads.next();
-      if (next.flushPending) {
+      DocumentsWriterPerThread next = allActiveThreads.next();
+      if (next.isFlushPending()) {
         pending.add(next);
       } else {
         notPending.add(next);
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index d980ce3..bcddd93 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -22,7 +22,9 @@ import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.io.PrintWriter;
 import java.io.StringReader;
+import java.io.StringWriter;
 import java.net.URI;
 import java.nio.file.FileSystem;
 import java.nio.file.Files;
@@ -2919,16 +2921,15 @@ public class TestIndexWriter extends LuceneTestCase {
   public void testFlushLargestWriter() throws IOException, InterruptedException {
     Directory dir = newDirectory();
     IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
-    int numDocs = indexDocsForMultipleThreadStates(w);
-    DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter
+    int numDocs = indexDocsForMultipleDWPTs(w);
+    DocumentsWriterPerThread largestNonPendingWriter
         = w.docWriter.flushControl.findLargestNonPendingWriter();
-    assertFalse(largestNonPendingWriter.flushPending);
-    assertNotNull(largestNonPendingWriter.dwpt);
+    assertFalse(largestNonPendingWriter.isFlushPending());
 
     int numRamDocs = w.numRamDocs();
-    int numDocsInDWPT = largestNonPendingWriter.dwpt.getNumDocsInRAM();
+    int numDocsInDWPT = largestNonPendingWriter.getNumDocsInRAM();
     assertTrue(w.flushNextBuffer());
-    assertNull(largestNonPendingWriter.dwpt);
+    assertTrue(largestNonPendingWriter.hasFlushed());
     assertEquals(numRamDocs-numDocsInDWPT, w.numRamDocs());
 
     // make sure it's not locked
@@ -2944,7 +2945,7 @@ public class TestIndexWriter extends LuceneTestCase {
     dir.close();
   }
 
-  private int indexDocsForMultipleThreadStates(IndexWriter w) throws InterruptedException {
+  private int indexDocsForMultipleDWPTs(IndexWriter w) throws InterruptedException {
     Thread[] threads = new Thread[3];
     CountDownLatch latch = new CountDownLatch(threads.length);
     int numDocsPerThread = 10 + random().nextInt(30);
@@ -2974,16 +2975,16 @@ public class TestIndexWriter extends LuceneTestCase {
   public void testNeverCheckOutOnFullFlush() throws IOException, InterruptedException {
     Directory dir = newDirectory();
     IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
-    indexDocsForMultipleThreadStates(w);
-    DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter
+    indexDocsForMultipleDWPTs(w);
+    DocumentsWriterPerThread largestNonPendingWriter
         = w.docWriter.flushControl.findLargestNonPendingWriter();
-    assertFalse(largestNonPendingWriter.flushPending);
-    assertNotNull(largestNonPendingWriter.dwpt);
-    int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
+    assertFalse(largestNonPendingWriter.isFlushPending());
+    assertFalse(largestNonPendingWriter.hasFlushed());
+    int threadPoolSize = w.docWriter.perThreadPool.size();
     w.docWriter.flushControl.markForFullFlush();
     DocumentsWriterPerThread documentsWriterPerThread = w.docWriter.flushControl.checkoutLargestNonPendingWriter();
     assertNull(documentsWriterPerThread);
-    assertEquals(activeThreadStateCount, w.docWriter.flushControl.numQueuedFlushes());
+    assertEquals(threadPoolSize, w.docWriter.flushControl.numQueuedFlushes());
     w.docWriter.flushControl.abortFullFlushes();
     assertNull("was aborted", w.docWriter.flushControl.checkoutLargestNonPendingWriter());
     assertEquals(0, w.docWriter.flushControl.numQueuedFlushes());
@@ -2994,11 +2995,11 @@ public class TestIndexWriter extends LuceneTestCase {
   public void testHoldLockOnLargestWriter() throws IOException, InterruptedException {
     Directory dir = newDirectory();
     IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
-    int numDocs = indexDocsForMultipleThreadStates(w);
-    DocumentsWriterPerThreadPool.ThreadState largestNonPendingWriter
+    int numDocs = indexDocsForMultipleDWPTs(w);
+    DocumentsWriterPerThread largestNonPendingWriter
         = w.docWriter.flushControl.findLargestNonPendingWriter();
-    assertFalse(largestNonPendingWriter.flushPending);
-    assertNotNull(largestNonPendingWriter.dwpt);
+    assertFalse(largestNonPendingWriter.isFlushPending());
+    assertFalse(largestNonPendingWriter.hasFlushed());
 
     CountDownLatch wait = new CountDownLatch(1);
     CountDownLatch locked = new CountDownLatch(1);
@@ -3031,7 +3032,7 @@ public class TestIndexWriter extends LuceneTestCase {
     lockThread.join();
     flushThread.join();
 
-    assertNull("largest DWPT should be flushed", largestNonPendingWriter.dwpt);
+    assertTrue("largest DWPT should be flushed", largestNonPendingWriter.hasFlushed());
     // make sure it's not locked
     largestNonPendingWriter.lock();
     largestNonPendingWriter.unlock();
@@ -3117,21 +3118,19 @@ public class TestIndexWriter extends LuceneTestCase {
   }
 
   private static void waitForDocsInBuffers(IndexWriter w, int buffersWithDocs) {
-    // wait until at least N threadstates have a doc in order to observe
+    // wait until at least N DWPTs have a doc in order to observe
     // who flushes the segments.
     while(true) {
       int numStatesWithDocs = 0;
       DocumentsWriterPerThreadPool perThreadPool = w.docWriter.perThreadPool;
-      for (int i = 0; i < perThreadPool.getActiveThreadStateCount(); i++) {
-        DocumentsWriterPerThreadPool.ThreadState threadState = perThreadPool.getThreadState(i);
-        threadState.lock();
+      for (DocumentsWriterPerThread dwpt : perThreadPool) {
+        dwpt.lock();
         try {
-          DocumentsWriterPerThread dwpt = threadState.dwpt;
-          if (dwpt != null && dwpt.getNumDocsInRAM() > 1) {
+          if (dwpt.getNumDocsInRAM() > 1) {
             numStatesWithDocs++;
           }
         } finally {
-          threadState.unlock();
+          dwpt.unlock();
         }
       }
       if (numStatesWithDocs >= buffersWithDocs) {
@@ -3703,22 +3702,19 @@ public class TestIndexWriter extends LuceneTestCase {
     Directory dir = newDirectory();
     IndexWriter w = new IndexWriter(dir, new IndexWriterConfig());
     w.addDocument(new Document());
-    int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
-    assertEquals(1, activeThreadStateCount);
+    assertEquals(1, w.docWriter.perThreadPool.size());
     CountDownLatch latch = new CountDownLatch(1);
     Thread thread = new Thread(() -> {
       latch.countDown();
       List<Closeable> states = new ArrayList<>();
       try {
         for (int i = 0; i < 100; i++) {
-          DocumentsWriterPerThreadPool.ThreadState state = w.docWriter.perThreadPool.getAndLock();
+          DocumentsWriterPerThread state = w.docWriter.perThreadPool.getAndLock();
           states.add(state::unlock);
-          if (state.isInitialized()) {
-            state.dwpt.deleteQueue.getNextSequenceNumber();
-          } else {
-            w.docWriter.deleteQueue.getNextSequenceNumber();
-          }
+          state.deleteQueue.getNextSequenceNumber();
         }
+      } catch (IOException e) {
+        throw new AssertionError(e);
       } finally {
         IOUtils.closeWhileHandlingException(states);
       }
@@ -3775,7 +3771,19 @@ public class TestIndexWriter extends LuceneTestCase {
       stopped.set(true);
       indexer.join();
       refresher.join();
-      assertNull("should not consider ACE a tragedy on a closed IW", w.getTragicException());
+      Throwable e = w.getTragicException();
+      IOSupplier<String> supplier = () -> {
+        if (e != null) {
+          StringWriter writer = new StringWriter();
+          try (PrintWriter printWriter = new PrintWriter(writer)) {
+            e.printStackTrace(printWriter);
+          }
+          return writer.toString();
+        } else {
+          return "";
+        }
+      };
+      assertNull("should not consider ACE a tragedy on a closed IW: " + supplier.get(), w.getTragicException());
       IOUtils.close(sm, dir);
     }
   }
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
index 7238869..dffb157 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
@@ -68,7 +68,6 @@ public class TestIndexWriterConfig extends LuceneTestCase {
     assertTrue(DocumentsWriterPerThread.defaultIndexingChain == conf.getIndexingChain());
     assertNull(conf.getMergedSegmentWarmer());
     assertEquals(TieredMergePolicy.class, conf.getMergePolicy().getClass());
-    assertEquals(DocumentsWriterPerThreadPool.class, conf.getIndexerThreadPool().getClass());
     assertEquals(FlushByRamOrCountsPolicy.class, conf.getFlushPolicy().getClass());
     assertEquals(IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB, conf.getRAMPerThreadHardLimitMB());
     assertEquals(Codec.getDefault(), conf.getCodec());
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
index b32ea30..e879019 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
@@ -1285,7 +1285,9 @@ public class TestIndexWriterDelete extends LuceneTestCase {
     }
 
     // First one triggers, but does not reflect, the merge:
-    System.out.println("TEST: now get reader");
+    if (VERBOSE) {
+      System.out.println("TEST: now get reader");
+    }
     DirectoryReader.open(w).close();
     IndexReader r = DirectoryReader.open(w);
     assertEquals(1, r.leaves().size());
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java
index 9799e65..cd68636 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java
@@ -533,8 +533,8 @@ public class TestIndexWriterOnDiskFull extends LuceneTestCase {
     dir.close();
   }
   
-  // LUCENE-1130: make sure immeidate disk full on creating
-  // an IndexWriter (hit during DW.ThreadState.init()) is
+  // LUCENE-1130: make sure immediate disk full on creating
+  // an IndexWriter (hit during DWPT#updateDocuments()) is
   // OK:
   public void testImmediateDiskFull() throws IOException {
     MockDirectoryWrapper dir = newMockDirectory();
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
index c71c509..3bf554f 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
@@ -136,7 +136,7 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
   }
 
   // LUCENE-1130: make sure immediate disk full on creating
-  // an IndexWriter (hit during DW.ThreadState.init()), with
+  // an IndexWriter (hit during DWPT#updateDocuments()), with
   // multiple threads, is OK:
   public void testImmediateDiskFullWithThreads() throws Exception {
 
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
index 0035033..c1a5c42 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
@@ -224,8 +224,8 @@ public class RandomIndexWriter implements Closeable {
     if (LuceneTestCase.VERBOSE) {
       System.out.println("RIW.add/updateDocument: now flushing the largest writer at docCount=" + docCount);
     }
-    int activeThreadStateCount = w.docWriter.perThreadPool.getActiveThreadStateCount();
-    int numFlushes = Math.min(1, r.nextInt(activeThreadStateCount+1));
+    int threadPoolSize = w.docWriter.perThreadPool.size();
+    int numFlushes = Math.min(1, r.nextInt(threadPoolSize+1));
     for (int i = 0; i < numFlushes; i++) {
       if (w.flushNextBuffer() == false) {
         break; // stop once we didn't flush anything
diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
index 15f6454..6aff51f 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
@@ -272,17 +272,22 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
   }
 
   public synchronized void corruptUnknownFiles() throws IOException {
-
-    System.out.println("MDW: corrupt unknown files");
+    if (LuceneTestCase.VERBOSE) {
+      System.out.println("MDW: corrupt unknown files");
+    }
     Set<String> knownFiles = new HashSet<>();
     for(String fileName : listAll()) {
       if (fileName.startsWith(IndexFileNames.SEGMENTS)) {
-        System.out.println("MDW: read " + fileName + " to gather files it references");
+        if (LuceneTestCase.VERBOSE) {
+          System.out.println("MDW: read " + fileName + " to gather files it references");
+        }
         SegmentInfos infos;
         try {
           infos = SegmentInfos.readCommit(this, fileName);
         } catch (IOException ioe) {
-          System.out.println("MDW: exception reading segment infos " + fileName + "; files: " + Arrays.toString(listAll()));
+          if (LuceneTestCase.VERBOSE) {
+            System.out.println("MDW: exception reading segment infos " + fileName + "; files: " + Arrays.toString(listAll()));
+          }
           throw ioe;
         }
         knownFiles.addAll(infos.files(true));
@@ -864,8 +869,9 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
           
         // TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles
         if (assertNoUnreferencedFilesOnClose) {
-          System.out.println("MDW: now assert no unref'd files at close");
-
+          if (LuceneTestCase.VERBOSE) {
+            System.out.println("MDW: now assert no unref'd files at close");
+          }
           // now look for unreferenced files: discount ones that we tried to delete but could not
           Set<String> allFiles = new HashSet<>(Arrays.asList(listAll()));
           String[] startFiles = allFiles.toArray(new String[0]);