You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/07 12:45:05 UTC

[11/50] [abbrv] lucene-solr:jira/solr-11285-sim: LUCENE-8043: Fix document accounting in IndexWriter

LUCENE-8043: Fix document accounting in IndexWriter

The IndexWriter check for too many documents does not always work, resulting in
going over the limit. Once this happens, Lucene refuses to open the index and
throws a CorruptIndexException: Too many documents.
This change also fixes document accounting if the index writer hits an aborting
exception and/or the writer is rolled back. Pending document counts are now consistent
with the latest SegmentInfos once the writer has been rolled back.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b7d8731b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b7d8731b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b7d8731b

Branch: refs/heads/jira/solr-11285-sim
Commit: b7d8731bbf2a9278c22efa5a7fb43285236c90ba
Parents: 52cefbe
Author: Simon Willnauer <si...@apache.org>
Authored: Thu Nov 30 18:56:21 2017 +0100
Committer: Simon Willnauer <si...@apache.org>
Committed: Mon Dec 4 21:49:24 2017 +0100

----------------------------------------------------------------------
 lucene/CHANGES.txt                              |   4 +
 .../apache/lucene/index/DocumentsWriter.java    |   5 +-
 .../lucene/index/DocumentsWriterPerThread.java  |   4 +-
 .../org/apache/lucene/index/IndexWriter.java    |  79 +++++++++----
 .../org/apache/lucene/index/SegmentInfos.java   |   4 +-
 .../lucene/index/TestIndexTooManyDocs.java      | 114 +++++++++++++++++++
 6 files changed, 180 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7d8731b/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index c861566..1ef55cd 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -157,6 +157,10 @@ Bug Fixes
 * LUCENE-8055: MemoryIndex.MemoryDocValuesIterator returns 2 documents 
   instead of 1. (Simon Willnauer)
 
+* LUCENE-8043: Fix document accounting in IndexWriter to prevent writing too many
+  documents. Once this happens, Lucene refuses to open the index and throws a
+  CorruptIndexException. (Simon Willnauer, Yonik Seeley, Mike McCandless) 
+
 Build
 
 * LUCENE-6144: Upgrade Ivy to 2.4.0; 'ant ivy-bootstrap' now removes old Ivy

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7d8731b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 6aca9f4..d4e4e23 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -549,7 +549,6 @@ final class DocumentsWriter implements Closeable, Accountable {
         try {
           // Each flush is assigned a ticket in the order they acquire the ticketQueue lock
           ticket = ticketQueue.addFlushTicket(flushingDWPT);
-  
           final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
           boolean dwptSuccess = false;
           try {
@@ -681,7 +680,9 @@ final class DocumentsWriter implements Closeable, Accountable {
         ticketQueue.addDeletes(flushingDeleteQueue);
       }
       ticketQueue.forcePurge(writer);
-      assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
+      // we can't assert that we don't have any tickets in teh queue since we might add a DocumentsWriterDeleteQueue
+      // concurrently if we have very small ram buffers this happens quite frequently
+      assert !flushingDeleteQueue.anyChanges();
     } finally {
       assert flushingDeleteQueue == currentFullFlushDelQueue;
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7d8731b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index 94ffba7..76c2906 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -118,6 +118,7 @@ class DocumentsWriterPerThread {
   void abort() {
     //System.out.println(Thread.currentThread().getName() + ": now abort seg=" + segmentInfo.name);
     aborted = true;
+    pendingNumDocs.addAndGet(-numDocsInRAM);
     try {
       if (infoStream.isEnabled("DWPT")) {
         infoStream.message("DWPT", "now abort");
@@ -491,7 +492,7 @@ class DocumentsWriterPerThread {
       if (infoStream.isEnabled("DWPT")) {
         infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0)/1000000.0) + " msec");
       }
-      
+
       return fs;
     } catch (Throwable th) {
       abort();
@@ -522,7 +523,6 @@ class DocumentsWriterPerThread {
    */
   void sealFlushedSegment(FlushedSegment flushedSegment, Sorter.DocMap sortMap) throws IOException {
     assert flushedSegment != null;
-
     SegmentCommitInfo newSegment = flushedSegment.segmentInfo;
 
     IndexWriter.setDiagnostics(newSegment.info, IndexWriter.SOURCE_FLUSH);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7d8731b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 6059218..a7e050d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -551,13 +552,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       return true;
     }
 
-    public synchronized void drop(SegmentCommitInfo info) throws IOException {
+    public synchronized boolean drop(SegmentCommitInfo info) throws IOException {
       final ReadersAndUpdates rld = readerMap.get(info);
       if (rld != null) {
         assert info == rld.info;
         readerMap.remove(info);
         rld.dropReaders();
+        return true;
       }
+      return false;
     }
 
     public synchronized long ramBytesUsed() {
@@ -726,7 +729,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       final Iterator<Map.Entry<SegmentCommitInfo,ReadersAndUpdates>> it = readerMap.entrySet().iterator();
       while(it.hasNext()) {
         final ReadersAndUpdates rld = it.next().getValue();
-
         try {
           if (doSave && rld.writeLiveDocs(directory)) {
             // Make sure we only write del docs and field updates for a live segment:
@@ -1097,7 +1099,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
         rollbackSegments = segmentInfos.createBackupSegmentInfos();
       }
 
-      commitUserData = new HashMap<String,String>(segmentInfos.getUserData()).entrySet();
+      commitUserData = new HashMap<>(segmentInfos.getUserData()).entrySet();
 
       pendingNumDocs.set(segmentInfos.totalMaxDoc());
 
@@ -1267,6 +1269,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
         if (infoStream.isEnabled("IW")) {
           infoStream.message("IW", "now flush at close");
         }
+
         flush(true, true);
         waitForMerges();
         commitInternal(config.getMergePolicy());
@@ -1617,9 +1620,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     // merge will skip merging it and will then drop
     // it once it's done:
     if (mergingSegments.contains(info) == false) {
-      segmentInfos.remove(info);
-      pendingNumDocs.addAndGet(-info.info.maxDoc());
-      readerPool.drop(info);
+      // it's possible that we invoke this method more than once for the same SCI
+      // we must only remove the docs once!
+      boolean dropPendingDocs = segmentInfos.remove(info);
+      try {
+        // this is sneaky - we might hit an exception while dropping a reader but then we have already
+        // removed the segment for the segmentInfo and we lost the pendingDocs update due to that.
+        // therefore we execute the adjustPendingNumDocs in a finally block to account for that.
+        dropPendingDocs |= readerPool.drop(info);
+      } finally {
+        if (dropPendingDocs) {
+          adjustPendingNumDocs(-info.info.maxDoc());
+        }
+      }
     }
   }
 
@@ -2342,6 +2355,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     // Make sure no commit is running, else e.g. we can close while another thread is still fsync'ing:
     synchronized(commitLock) {
       rollbackInternalNoCommit();
+
+      assert pendingNumDocs.get() == segmentInfos.totalMaxDoc()
+          : "pendingNumDocs " + pendingNumDocs.get() + " != " + segmentInfos.totalMaxDoc() + " totalMaxDoc";
     }
   }
 
@@ -2354,7 +2370,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     
     try {
       abortMerges();
-
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "rollback: done finish merges");
       }
@@ -2365,6 +2380,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
 
       docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes 
       docWriter.abort(this); // don't sync on IW here
+      docWriter.flushControl.waitForFlush(); // wait for all concurrently running flushes
+      purge(true); // empty the flush ticket queue otherwise we might not have cleaned up all resources
       synchronized(this) {
 
         if (pendingCommit != null) {
@@ -2376,15 +2393,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
             notifyAll();
           }
         }
-
         // Don't bother saving any changes in our segmentInfos
         readerPool.dropAll(false);
-
+        final int totalMaxDoc = segmentInfos.totalMaxDoc();
         // Keep the same segmentInfos instance but replace all
         // of its SegmentInfo instances so IFD below will remove
         // any segments we flushed since the last commit:
         segmentInfos.rollbackSegmentInfos(rollbackSegments);
-
+        int rollbackMaxDoc = segmentInfos.totalMaxDoc();
+        // now we need to adjust this back to the rolled back SI but don't set it to the absolute value
+        // otherwise we might hide internal bugsf
+        adjustPendingNumDocs(-(totalMaxDoc-rollbackMaxDoc));
         if (infoStream.isEnabled("IW") ) {
           infoStream.message("IW", "rollback: infos=" + segString(segmentInfos));
         }
@@ -2495,9 +2514,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
      */
     try {
       synchronized (fullFlushLock) { 
-        long abortedDocCount = docWriter.lockAndAbortAll(this);
-        pendingNumDocs.addAndGet(-abortedDocCount);
-        
+        docWriter.lockAndAbortAll(this);
         processEvents(false, true);
         synchronized (this) {
           try {
@@ -2505,11 +2522,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
             abortMerges();
             // Let merges run again
             stopMerges = false;
+            adjustPendingNumDocs(-segmentInfos.totalMaxDoc());
             // Remove all segments
-            pendingNumDocs.addAndGet(-segmentInfos.totalMaxDoc());
             segmentInfos.clear();
             // Ask deleter to locate unreferenced files & remove them:
             deleter.checkpoint(segmentInfos, false);
+
             /* don't refresh the deleter here since there might
              * be concurrent indexing requests coming in opening
              * files on the directory after we called DW#abort()
@@ -2522,12 +2540,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
             changeCount.incrementAndGet();
             segmentInfos.changed();
             globalFieldNumberMap.clear();
-
             success = true;
             long seqNo = docWriter.deleteQueue.getNextSequenceNumber();
             docWriter.setLastSeqNo(seqNo);
             return seqNo;
-
           } finally {
             docWriter.unlockAllAfterAbortAll(this);
             if (!success) {
@@ -2660,6 +2676,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   synchronized void publishFlushedSegment(SegmentCommitInfo newSegment,
                                           FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket,
                                           Sorter.DocMap sortMap) throws IOException {
+    boolean published = false;
     try {
       // Lock order IW -> BDS
       ensureOpen(false);
@@ -2695,6 +2712,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       }
       newSegment.setBufferedDeletesGen(nextGen);
       segmentInfos.add(newSegment);
+      published = true;
       checkpoint();
 
       if (packet != null && packet.any() && sortMap != null) {
@@ -2705,6 +2723,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       }
       
     } finally {
+      if (published == false) {
+        adjustPendingNumDocs(-newSegment.info.maxDoc());
+      }
       flushCount.incrementAndGet();
       doAfterFlush();
     }
@@ -3894,7 +3915,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       // doing this  makes  MockDirWrapper angry in
       // TestNRTThreads (LUCENE-5434):
       readerPool.drop(merge.info);
-
       // Safe: these files must exist:
       deleteNewFiles(merge.info.files());
       return false;
@@ -3955,9 +3975,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
 
     // Now deduct the deleted docs that we just reclaimed from this
     // merge:
-    int delDocCount = merge.totalMaxDoc - merge.info.info.maxDoc();
+    int delDocCount;
+    if (dropSegment) {
+      // if we drop the segment we have to reduce the pendingNumDocs by merge.totalMaxDocs since we never drop
+      // the docs when we apply deletes if the segment is currently merged.
+      delDocCount = merge.totalMaxDoc;
+    } else {
+      delDocCount = merge.totalMaxDoc - merge.info.info.maxDoc();
+    }
     assert delDocCount >= 0;
-    pendingNumDocs.addAndGet(-delDocCount);
+    adjustPendingNumDocs(-delDocCount);
 
     if (dropSegment) {
       assert !segmentInfos.contains(merge.info);
@@ -5072,11 +5099,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
   }
   
   private void processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
-    boolean processed = false;
     if (tragedy == null) {
       Event event;
       while ((event = queue.poll()) != null)  {
-        processed = true;
         event.process(this, triggerMerge, forcePurge);
       }
     }
@@ -5088,7 +5113,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
    * encoded inside the {@link #process(IndexWriter, boolean, boolean)} method.
    *
    */
-  static interface Event {
+  interface Event {
     
     /**
      * Processes the event. This method is called by the {@link IndexWriter}
@@ -5111,9 +5136,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
    *  IllegalArgumentException} if it's not allowed. */ 
   private void reserveDocs(long addedNumDocs) {
     assert addedNumDocs >= 0;
-    if (pendingNumDocs.addAndGet(addedNumDocs) > actualMaxDocs) {
+    if (adjustPendingNumDocs(addedNumDocs) > actualMaxDocs) {
       // Reserve failed: put the docs back and throw exc:
-      pendingNumDocs.addAndGet(-addedNumDocs);
+      adjustPendingNumDocs(-addedNumDocs);
       tooManyDocs(addedNumDocs);
     }
   }
@@ -5142,4 +5167,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     ensureOpen();
     return docWriter.getMaxCompletedSequenceNumber();
   }
+
+  private long adjustPendingNumDocs(long numDocs) {
+    long count = pendingNumDocs.addAndGet(numDocs);
+    assert count >= 0 : "pendingNumDocs is negative: " + count;
+    return count;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7d8731b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
index f796d34..008b6e3 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
@@ -972,8 +972,8 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
   /** Remove the provided {@link SegmentCommitInfo}.
    *
    * <p><b>WARNING</b>: O(N) cost */
-  public void remove(SegmentCommitInfo si) {
-    segments.remove(si);
+  public boolean remove(SegmentCommitInfo si) {
+    return segments.remove(si);
   }
   
   /** Remove the {@link SegmentCommitInfo} at the

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b7d8731b/lucene/core/src/test/org/apache/lucene/index/TestIndexTooManyDocs.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexTooManyDocs.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexTooManyDocs.java
new file mode 100644
index 0000000..78305f4
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexTooManyDocs.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.index;
+
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LuceneTestCase;
+
+public class TestIndexTooManyDocs extends LuceneTestCase {
+
+  /*
+   * This test produces a boat load of very small segments with lot of deletes which are likely deleting
+   * the entire segment. see https://issues.apache.org/jira/browse/LUCENE-8043
+   */
+  public void testIndexTooManyDocs() throws IOException, InterruptedException {
+    Directory dir = newDirectory();
+    int numMaxDoc = 25;
+    IndexWriterConfig config = new IndexWriterConfig();
+    config.setRAMBufferSizeMB(0.000001); // force lots of small segments and logs of concurrent deletes
+    IndexWriter writer = new IndexWriter(dir, config);
+    try {
+      IndexWriter.setMaxDocs(numMaxDoc);
+      int numThreads = 5 + random().nextInt(5);
+      Thread[] threads = new Thread[numThreads];
+      CountDownLatch latch = new CountDownLatch(numThreads);
+      CountDownLatch indexingDone = new CountDownLatch(numThreads - 2);
+      AtomicBoolean done = new AtomicBoolean(false);
+      for (int i = 0; i < numThreads; i++) {
+        if (i >= 2) {
+          threads[i] = new Thread(() -> {
+            latch.countDown();
+            try {
+              latch.await();
+            } catch (InterruptedException e) {
+              throw new AssertionError(e);
+            }
+            for (int d = 0; d < 100; d++) {
+              Document doc = new Document();
+              String id = Integer.toString(random().nextInt(numMaxDoc * 2));
+              doc.add(new StringField("id", id, Field.Store.NO));
+              try {
+                Term t = new Term("id", id);
+                if (random().nextInt(5) == 0) {
+                  writer.deleteDocuments(new TermQuery(t));
+                }
+                writer.updateDocument(t, doc);
+              } catch (IOException e) {
+                throw new AssertionError(e);
+              } catch (IllegalArgumentException e) {
+                assertEquals("number of documents in the index cannot exceed " + IndexWriter.getActualMaxDocs(), e.getMessage());
+              }
+            }
+            indexingDone.countDown();
+          });
+        } else {
+          threads[i] = new Thread(() -> {
+            try {
+              latch.countDown();
+              latch.await();
+              DirectoryReader open = DirectoryReader.open(writer, true, true);
+              while (done.get() == false) {
+                DirectoryReader directoryReader = DirectoryReader.openIfChanged(open);
+                if (directoryReader != null) {
+                  open.close();
+                  open = directoryReader;
+                }
+              }
+              IOUtils.closeWhileHandlingException(open);
+            } catch (Exception e) {
+              throw new AssertionError(e);
+            }
+          });
+        }
+        threads[i].start();
+      }
+
+      indexingDone.await();
+      done.set(true);
+
+
+      for (int i = 0; i < numThreads; i++) {
+        threads[i].join();
+      }
+      writer.close();
+      dir.close();
+    } finally {
+      IndexWriter.setMaxDocs(IndexWriter.MAX_DOCS);
+    }
+  }
+}
\ No newline at end of file