You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2013/09/20 06:39:52 UTC

svn commit: r1524900 - in /lucene/dev/trunk/lucene/core/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/

Author: shaie
Date: Fri Sep 20 04:39:51 2013
New Revision: 1524900

URL: http://svn.apache.org/r1524900
Log:
LUCENE-5189: fixed concurrency bug

Modified:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1524900&r1=1524899&r2=1524900&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Fri Sep 20 04:39:51 2013
@@ -433,6 +433,7 @@ public class IndexWriter implements Clos
       final ReadersAndLiveDocs rld = readerMap.get(info);
       if (rld != null) {
         assert info == rld.info;
+//        System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.drop: " + info);
         readerMap.remove(info);
         rld.dropReaders();
       }
@@ -459,6 +460,7 @@ public class IndexWriter implements Clos
       if (!poolReaders && rld.refCount() == 1) {
         // This is the last ref to this RLD, and we're not
         // pooling, so remove it:
+//        System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.release: " + rld.info);
         if (rld.writeLiveDocs(directory)) {
           // Make sure we only write del docs and field updates for a live segment:
           assert infoIsLive(rld.info);
@@ -467,6 +469,7 @@ public class IndexWriter implements Clos
           deleter.checkpoint(segmentInfos, false);
         }
 
+//        System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.release: drop readers " + rld.info);
         rld.dropReaders();
         readerMap.remove(rld.info);
       }
@@ -3157,7 +3160,9 @@ public class IndexWriter implements Clos
       assert rld != null: "seg=" + info.info.name;
       final Bits currentLiveDocs = rld.getLiveDocs();
       final Map<Integer,Map<String,Long>> mergingUpdates = rld.getMergingUpdates();
-      
+
+//      System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: info=" + info + ", mergingUpdates=" + mergingUpdates);
+
       if (prevLiveDocs != null) {
 
         // If we had deletions on starting the merge we must
@@ -3284,6 +3289,7 @@ public class IndexWriter implements Clos
 
     // set any updates that came while the segment was merging
     if (!mergedUpdates.isEmpty()) {
+//      System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: mergedDeletes.info=" + mergedDeletes.info + ", mergedUpdates=" + mergedUpdates);
       assert mergedDeletes != null;
       mergedDeletes.setMergingUpdates(mergedUpdates);
     }
@@ -3331,6 +3337,7 @@ public class IndexWriter implements Clos
     }
 
     final ReadersAndLiveDocs mergedDeletes =  merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge, mergeState);
+//    System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMerge: mergedDeletes=" + mergedDeletes);
 
     assert mergedDeletes == null || mergedDeletes.getPendingDeleteCount() != 0 || mergedDeletes.hasFieldUpdates();
 
@@ -3364,6 +3371,7 @@ public class IndexWriter implements Clos
 
     if (mergedDeletes != null) {
       if (dropSegment) {
+//        System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMerge: dropChanges " + merge.info);
         mergedDeletes.dropChanges();
       }
       readerPool.release(mergedDeletes);
@@ -3677,7 +3685,7 @@ public class IndexWriter implements Clos
     setDiagnostics(si, SOURCE_MERGE, details);
     merge.setInfo(new SegmentInfoPerCommit(si, 0, -1L, -1L));
 
-//    System.out.println("[" + Thread.currentThread().getName() + "] _mergeInit: " + segString(merge.segments) + " into " + si);
+//    System.out.println("[" + Thread.currentThread().getName() + "] IW._mergeInit: " + segString(merge.segments) + " into " + si);
 
     // Lock order: IW -> BD
     bufferedDeletesStream.prune(segmentInfos);
@@ -3743,8 +3751,9 @@ public class IndexWriter implements Clos
           assert rld != null;
           if (drop) {
             rld.dropChanges();
+          } else {
+            rld.dropMergingUpdates();
           }
-          rld.setMerging(false);
           rld.release(sr);
           readerPool.release(rld);
           if (drop) {
@@ -3802,13 +3811,9 @@ public class IndexWriter implements Clos
         // Hold onto the "live" reader; we will use this to
         // commit merged deletes
         final ReadersAndLiveDocs rld = readerPool.get(info, true);
-        SegmentReader reader = rld.getReader(true, context);
+        SegmentReader reader = rld.getReaderForMerge(context);
         assert reader != null;
 
-        // Notify that we are merging, so that we can later copy the updates
-        // that were received while merging to the merged segment.
-        rld.setMerging(true);
-        
         // Carefully pull the most recent live docs:
         final Bits liveDocs;
         final int delCount;
@@ -3860,6 +3865,8 @@ public class IndexWriter implements Clos
         segUpto++;
       }
 
+//      System.out.println("[" + Thread.currentThread().getName() + "] IW.mergeMiddle: merging " + merge.getMergeReaders());
+      
       // we pass merge.getMergeReaders() instead of merge.readers to allow the
       // OneMerge to return a view over the actual segments to merge
       final SegmentMerger merger = new SegmentMerger(merge.getMergeReaders(),

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java?rev=1524900&r1=1524899&r2=1524900&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java Fri Sep 20 04:39:51 2013
@@ -170,29 +170,27 @@ class ReadersAndLiveDocs { // TODO (DVU_
     return reader;
   }
   
-  private SegmentReader doGetReaderWithUpdates(IOContext context) throws IOException {
+  private synchronized SegmentReader doGetReaderWithUpdates(IOContext context) throws IOException {
+    assert Thread.holdsLock(writer); // when we get here, we should already have the writer lock
     boolean checkpoint = false;
     try {
-      // don't synchronize the entire method because we cannot call
-      // writer.checkpoint() while holding the RLD lock, otherwise we might hit
-      // a deadlock w/ e.g. a concurrent merging thread.
-      synchronized (this) {
-        checkpoint = writeLiveDocs(info.info.dir);
-        if (reader == null) {
-          // We steal returned ref:
-          reader = new SegmentReader(info, context);
-          if (liveDocs == null) {
-            liveDocs = reader.getLiveDocs();
-          }
-        } else if (checkpoint) {
-          // enroll a new reader with the applied updates
-          reopenReader(context);
+      checkpoint = writeLiveDocs(info.info.dir);
+      if (reader == null) {
+        // We steal returned ref:
+//        System.out.println("[" + Thread.currentThread().getName() + "] RLD.doGetReaderWithUpdates: newSR " + info);
+        reader = new SegmentReader(info, context);
+        if (liveDocs == null) {
+          liveDocs = reader.getLiveDocs();
         }
-        
-        // Ref for caller
-        reader.incRef();
-        return reader;
+      } else if (checkpoint) {
+        // enroll a new reader with the applied updates
+//        System.out.println("[" + Thread.currentThread().getName() + "] RLD.doGetReaderWithUpdates: reopenReader " + info);
+        reopenReader(context);
       }
+      
+      // Ref for caller
+      reader.incRef();
+      return reader;
     } finally {
       if (checkpoint) {
         writer.checkpoint();
@@ -208,9 +206,11 @@ class ReadersAndLiveDocs { // TODO (DVU_
     // cost of obtaining it.
     if (applyFieldUpdates && hasFieldUpdates()) {
       synchronized (writer) {
+//        System.out.println("[" + Thread.currentThread().getName() + "] RLD.getReader: getReaderWithUpdates " + info);
         return doGetReaderWithUpdates(context);
       }
     } else {
+//      System.out.println("[" + Thread.currentThread().getName() + "] RLD.getReader: getReader no updates " + info);
       return doGetReader(context);
     }
   }
@@ -367,6 +367,7 @@ class ReadersAndLiveDocs { // TODO (DVU_
       if (hasFieldUpdates) {
         // reader could be null e.g. for a just merged segment (from
         // IndexWriter.commitMergedDeletes).
+//        if (this.reader == null) System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: newSR " + info);
         final SegmentReader reader = this.reader == null ? new SegmentReader(info, IOContext.READONCE) : this.reader;
         try {
           // clone FieldInfos so that we can update their numericUpdatesGen
@@ -396,6 +397,7 @@ class ReadersAndLiveDocs { // TODO (DVU_
           final DocValuesConsumer fieldsConsumer = docValuesFormat.fieldsConsumer(state);
           boolean fieldsConsumerSuccess = false;
           try {
+//            System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: applying updates; seg=" + info + " updates=" + numericUpdates);
             for (Entry<String,Map<Integer,Long>> e : numericUpdates.entrySet()) {
               final String field = e.getKey();
               final Map<Integer,Long> updates = e.getValue();
@@ -459,6 +461,7 @@ class ReadersAndLiveDocs { // TODO (DVU_
           }
         } finally {
           if (reader != this.reader) {
+//            System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: closeReader " + reader);
             reader.close();
           }
         }
@@ -501,7 +504,7 @@ class ReadersAndLiveDocs { // TODO (DVU_
     if (hasFieldUpdates) {
       info.advanceDocValuesGen();
       // copy all the updates to mergingUpdates, so they can later be applied to the merged segment
-      if (isMerging) {
+      if (isMerging || true) {
         copyUpdatesToMerging();
       }
       numericUpdates.clear();
@@ -513,6 +516,9 @@ class ReadersAndLiveDocs { // TODO (DVU_
   }
 
   private void copyUpdatesToMerging() {
+//    System.out.println("[" + Thread.currentThread().getName() + "] RLD.copyUpdatesToMerging: " + numericUpdates);
+    // cannot do a simple putAll, even if mergingUpdates is empty, because we
+    // need a shallow copy of the values (maps)
     for (Entry<String,Map<Integer,Long>> e : numericUpdates.entrySet()) {
       String field = e.getKey();
       Map<Integer,Long> merging = mergingUpdates.get(field);
@@ -525,19 +531,34 @@ class ReadersAndLiveDocs { // TODO (DVU_
   }
   
   /**
-   * Indicates whether this segment is currently being merged. Call this just
-   * before the segment is being merged with {@code true} and when the merge has
-   * finished and all updates have been applied to the merged segment, call this
-   * with {@code false}.
+   * Returns a reader for merge. This method applies field updates if there are
+   * any and marks that this segment is currently merging.
    */
-  public synchronized void setMerging(boolean isMerging) {
-    this.isMerging = isMerging;
-    if (!isMerging) {
-      mergingUpdates.clear();
+  SegmentReader getReaderForMerge(IOContext context) throws IOException {
+    // lock ordering must be IW -> RLD, otherwise could cause deadlocks
+    synchronized (writer) {
+      synchronized (this) {
+        // must execute these two statements as atomic operation, otherwise we
+        // could lose updates if e.g. another thread calls writeLiveDocs in
+        // between, or the updates are applied to the obtained reader, but then
+        // re-applied in IW.commitMergedDeletes (unnecessary work and potential
+        // bugs.
+        isMerging = true;
+        return getReader(true, context);
+      }
     }
   }
   
   /**
+   * Drops all merging updates. Called from IndexWriter after this segment
+   * finished merging (whether successfully or not).
+   */
+  public synchronized void dropMergingUpdates() {
+    mergingUpdates.clear();
+    isMerging = false;
+  }
+  
+  /**
    * Called from IndexWriter after applying deletes to the merged segment, while
    * it was being merged.
    */

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java?rev=1524900&r1=1524899&r2=1524900&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java Fri Sep 20 04:39:51 2013
@@ -82,7 +82,7 @@ public final class SegmentReader extends
   public SegmentReader(SegmentInfoPerCommit si, IOContext context) throws IOException {
     this.si = si;
     core = new SegmentCoreReaders(this, si.info.dir, si, context);
-    
+
     boolean success = false;
     final Codec codec = si.info.getCodec();
     try {
@@ -101,6 +101,8 @@ public final class SegmentReader extends
         // initialize the per generation numericDVProducers and put the correct
         // DVProducer for each field
         final Map<Long,List<FieldInfo>> genInfos = getGenInfos(si);
+        
+//        System.out.println("[" + Thread.currentThread().getName() + "] SR.init: new reader: " + si + "; gens=" + genInfos.keySet());
 
         for (Entry<Long,List<FieldInfo>> e : genInfos.entrySet()) {
           Long gen = e.getKey();
@@ -150,6 +152,8 @@ public final class SegmentReader extends
     this.core = sr.core;
     core.incRef();
     
+//    System.out.println("[" + Thread.currentThread().getName() + "] SR.init: sharing reader: " + sr + " for gens=" + sr.genDVProducers.keySet());
+    
     // increment refCount of DocValuesProducers that are used by this reader
     boolean success = false;
     try {
@@ -170,9 +174,11 @@ public final class SegmentReader extends
             if (dvp != null) {
               // gen used by given reader, incRef its DVP
               dvp.incRef();
+//              System.out.println("[" + Thread.currentThread().getName() + "] SR.init: sharing DVP for gen=" + gen + " refCount=" + dvp.getRefCount());
             } else {
               // this gen is not used by given reader, initialize a new one
               dvp = newDocValuesProducer(si, IOContext.READ, dir, dvFormat, gen, infos);
+//              System.out.println("[" + Thread.currentThread().getName() + "] SR.init: new DVP for gen=" + gen + " refCount=" + dvp.getRefCount());
             }
             assert dvp != null;
             genDVProducers.put(gen, dvp);

Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java?rev=1524900&r1=1524899&r2=1524900&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java Fri Sep 20 04:39:51 2013
@@ -707,8 +707,8 @@ public class TestNumericDocValuesUpdates
     int numRounds = atLeast(15);
     int docID = 0;
     for (int i = 0; i < numRounds; i++) {
-      int numDocs = atLeast(2);
-      //      System.out.println("round=" + i + ", numDocs=" + numDocs);
+      int numDocs = atLeast(5);
+//      System.out.println("[" + Thread.currentThread().getName() + "]: round=" + i + ", numDocs=" + numDocs);
       for (int j = 0; j < numDocs; j++) {
         Document doc = new Document();
         doc.add(new StringField("id", "doc-" + docID, Store.NO));
@@ -724,12 +724,12 @@ public class TestNumericDocValuesUpdates
       int fieldIdx = random.nextInt(fieldValues.length);
       String updateField = "f" + fieldIdx;
       writer.updateNumericDocValue(new Term("key", "all"), updateField, ++fieldValues[fieldIdx]);
-      //      System.out.println("+++ updated field '" + updateField + "' to value " + fieldValues[fieldIdx]);
+//      System.out.println("[" + Thread.currentThread().getName() + "]: updated field '" + updateField + "' to value " + fieldValues[fieldIdx]);
       
       if (random.nextDouble() < 0.2) {
         int deleteDoc = random.nextInt(docID); // might also delete an already deleted document, ok!
         writer.deleteDocuments(new Term("id", "doc-" + deleteDoc));
-        //        System.out.println("--- deleted document: doc-" + deleteDoc);
+//        System.out.println("[" + Thread.currentThread().getName() + "]: deleted document: doc-" + deleteDoc);
       }
       
       // verify reader
@@ -737,14 +737,16 @@ public class TestNumericDocValuesUpdates
         writer.commit();
       }
       
+//      System.out.println("[" + Thread.currentThread().getName() + "]: reopen reader: " + reader);
       DirectoryReader newReader = DirectoryReader.openIfChanged(reader);
       assertNotNull(newReader);
       reader.close();
       reader = newReader;
+//      System.out.println("[" + Thread.currentThread().getName() + "]: reopened reader: " + reader);
       assertTrue(reader.numDocs() > 0); // we delete at most one document per round
       for (AtomicReaderContext context : reader.leaves()) {
         AtomicReader r = context.reader();
-        //        System.out.println(((SegmentReader) r).getSegmentName());
+//        System.out.println(((SegmentReader) r).getSegmentName());
         Bits liveDocs = r.getLiveDocs();
         for (int field = 0; field < fieldValues.length; field++) {
           String f = "f" + field;
@@ -754,12 +756,12 @@ public class TestNumericDocValuesUpdates
           for (int doc = 0; doc < maxDoc; doc++) {
             if (liveDocs == null || liveDocs.get(doc)) {
               //              System.out.println("doc=" + (doc + context.docBase) + " f='" + f + "' vslue=" + ndv.get(doc));
-              assertEquals("invalid value for doc=" + (doc + context.docBase) + ", field=" + f, fieldValues[field], ndv.get(doc));
+              assertEquals("invalid value for doc=" + doc + ", field=" + f + ", reader=" + r, fieldValues[field], ndv.get(doc));
             }
           }
         }
       }
-      //      System.out.println();
+//      System.out.println();
     }
     
     IOUtils.close(writer, reader, dir);



Re: svn commit: r1524900 - in /lucene/dev/trunk/lucene/core/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/

Posted by Shai Erera <se...@gmail.com>.
No :). Fixed. Thanks for catching!


On Fri, Sep 20, 2013 at 7:50 AM, Robert Muir <rc...@gmail.com> wrote:

> On Fri, Sep 20, 2013 at 12:39 AM,  <sh...@apache.org> wrote:
> > @@ -501,7 +504,7 @@ class ReadersAndLiveDocs { // TODO (DVU_
> >      if (hasFieldUpdates) {
> >        info.advanceDocValuesGen();
> >        // copy all the updates to mergingUpdates, so they can later be
> applied to the merged segment
> > -      if (isMerging) {
> > +      if (isMerging || true) {
> >          copyUpdatesToMerging();
> >        }
> >        numericUpdates.clear();
>
> Is this really intentional?
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
> For additional commands, e-mail: dev-help@lucene.apache.org
>
>

Re: svn commit: r1524900 - in /lucene/dev/trunk/lucene/core/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/

Posted by Robert Muir <rc...@gmail.com>.
On Fri, Sep 20, 2013 at 12:39 AM,  <sh...@apache.org> wrote:
> @@ -501,7 +504,7 @@ class ReadersAndLiveDocs { // TODO (DVU_
>      if (hasFieldUpdates) {
>        info.advanceDocValuesGen();
>        // copy all the updates to mergingUpdates, so they can later be applied to the merged segment
> -      if (isMerging) {
> +      if (isMerging || true) {
>          copyUpdatesToMerging();
>        }
>        numericUpdates.clear();

Is this really intentional?

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org