You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2012/03/14 16:43:01 UTC

svn commit: r1300596 - in /lucene/dev/branches/branch_3x: ./ lucene/ lucene/core/src/java/org/apache/lucene/util/ lucene/core/src/test/org/apache/lucene/util/

Author: mikemccand
Date: Wed Mar 14 15:43:00 2012
New Revision: 1300596

URL: http://svn.apache.org/viewvc?rev=1300596&view=rev
Log:
LUCENE-3841: fix CloseableThreadLocal to also purge entries periodically in get(), to avoid holding onto objects for too long

Modified:
    lucene/dev/branches/branch_3x/   (props changed)
    lucene/dev/branches/branch_3x/lucene/   (props changed)
    lucene/dev/branches/branch_3x/lucene/CHANGES.txt
    lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java
    lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java
    lucene/dev/branches/branch_3x/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java

Modified: lucene/dev/branches/branch_3x/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/CHANGES.txt?rev=1300596&r1=1300595&r2=1300596&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/CHANGES.txt (original)
+++ lucene/dev/branches/branch_3x/lucene/CHANGES.txt Wed Mar 14 15:43:00 2012
@@ -228,6 +228,10 @@ Bug fixes
   sloppy query missed documents that exact query matched. 
   Fixed except when for repeating multiterms (e.g. "yes ho yes|no").
   (Robert Muir, Doron Cohen)
+
+* LUCENE-3841: Fix CloseableThreadLocal to also purge stale entries on
+  get(); this fixes certain cases where we were holding onto objects
+  for dead threads for too long (Matthew Bellew, Mike McCandless)
     
 Optimizations
 

Modified: lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java?rev=1300596&r1=1300595&r2=1300596&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java (original)
+++ lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java Wed Mar 14 15:43:00 2012
@@ -19,9 +19,10 @@ package org.apache.lucene.util;
 
 import java.io.Closeable;
 import java.lang.ref.WeakReference;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** Java's builtin ThreadLocal has a serious flaw:
  *  it can take an arbitrarily long amount of time to
@@ -56,8 +57,19 @@ public class CloseableThreadLocal<T> imp
 
   private ThreadLocal<WeakReference<T>> t = new ThreadLocal<WeakReference<T>>();
 
-  private Map<Thread,T> hardRefs = new HashMap<Thread,T>();
+  // Use a WeakHashMap so that if a Thread exits and is
+  // GC'able, its entry may be removed:
+  private Map<Thread,T> hardRefs = new WeakHashMap<Thread,T>();
   
+  // Increase this to decrease frequency of purging in get:
+  private static int PURGE_MULTIPLIER = 20;
+
+  // On each get or set we decrement this; when it hits 0 we
+  // purge.  After purge, we set this to
+  // PURGE_MULTIPLIER * stillAliveCount.  This keeps
+  // amortized cost of purging linear.
+  private final AtomicInteger countUntilPurge = new AtomicInteger(PURGE_MULTIPLIER);
+
   protected T initialValue() {
     return null;
   }
@@ -69,9 +81,11 @@ public class CloseableThreadLocal<T> imp
       if (iv != null) {
         set(iv);
         return iv;
-      } else
+      } else {
         return null;
+      }
     } else {
+      maybePurge();
       return weakRef.get();
     }
   }
@@ -82,13 +96,35 @@ public class CloseableThreadLocal<T> imp
 
     synchronized(hardRefs) {
       hardRefs.put(Thread.currentThread(), object);
+      maybePurge();
+    }
+  }
 
-      // Purge dead threads
+  private void maybePurge() {
+    if (countUntilPurge.getAndDecrement() == 0) {
+      purge();
+    }
+  }
+
+  // Purge dead threads
+  private void purge() {
+    synchronized(hardRefs) {
+      int stillAliveCount = 0;
       for (Iterator<Thread> it = hardRefs.keySet().iterator(); it.hasNext();) {
         final Thread t = it.next();
-        if (!t.isAlive())
+        if (!t.isAlive()) {
           it.remove();
+        } else {
+          stillAliveCount++;
+        }
+      }
+      int nextCount = (1+stillAliveCount) * PURGE_MULTIPLIER;
+      if (nextCount <= 0) {
+        // defensive: int overflow!
+        nextCount = 1000000;
       }
+      
+      countUntilPurge.set(nextCount);
     }
   }
 

Modified: lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java?rev=1300596&r1=1300595&r2=1300596&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java (original)
+++ lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java Wed Mar 14 15:43:00 2012
@@ -17,7 +17,6 @@ package org.apache.lucene.util;
  * limitations under the License.
  */
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -46,13 +45,12 @@ public final class PagedBytes {
 
   private static final byte[] EMPTY_BYTES = new byte[0];
 
-  public final static class Reader implements Closeable {
+  public final static class Reader {
     private final byte[][] blocks;
     private final int[] blockEnds;
     private final int blockBits;
     private final int blockMask;
     private final int blockSize;
-    private final CloseableThreadLocal<byte[]> threadBuffers = new CloseableThreadLocal<byte[]>();
 
     public Reader(PagedBytes pagedBytes) {
       blocks = new byte[pagedBytes.blocks.size()][];
@@ -79,6 +77,7 @@ public final class PagedBytes {
      **/
     public BytesRef fillSlice(BytesRef b, long start, int length) {
       assert length >= 0: "length=" + length;
+      assert length <= blockSize+1;
       final int index = (int) (start >> blockBits);
       final int offset = (int) (start & blockMask);
       b.length = length;
@@ -88,18 +87,10 @@ public final class PagedBytes {
         b.offset = offset;
       } else {
         // Split
-        byte[] buffer = threadBuffers.get();
-        if (buffer == null) {
-          buffer = new byte[length];
-          threadBuffers.set(buffer);
-        } else if (buffer.length < length) {
-          buffer = ArrayUtil.grow(buffer, length);
-          threadBuffers.set(buffer);
-        }
-        b.bytes = buffer;
+        b.bytes = new byte[length];
         b.offset = 0;
-        System.arraycopy(blocks[index], offset, buffer, 0, blockSize-offset);
-        System.arraycopy(blocks[1+index], 0, buffer, blockSize-offset, length-(blockSize-offset));
+        System.arraycopy(blocks[index], offset, b.bytes, 0, blockSize-offset);
+        System.arraycopy(blocks[1+index], 0, b.bytes, blockSize-offset, length-(blockSize-offset));
       }
       return b;
     }
@@ -216,25 +207,12 @@ public final class PagedBytes {
       }
       assert length >= 0: "length=" + length;
       b.length = length;
-      if (blockSize - offset >= length) {
-        // Within block
-        b.offset = offset;
-        b.bytes = blocks[index];
-      } else {
-        // Split
-        byte[] buffer = threadBuffers.get();
-        if (buffer == null) {
-          buffer = new byte[length];
-          threadBuffers.set(buffer);
-        } else if (buffer.length < length) {
-          buffer = ArrayUtil.grow(buffer, length);
-          threadBuffers.set(buffer);
-        }
-        b.bytes = buffer;
-        b.offset = 0;
-        System.arraycopy(blocks[index], offset, buffer, 0, blockSize-offset);
-        System.arraycopy(blocks[1+index], 0, buffer, blockSize-offset, length-(blockSize-offset));
-      }
+      // We always alloc a new block when writing w/ prefix;
+      // we could some day relax that and span two blocks:
+      assert blockSize - offset >= length;
+      // Within block
+      b.offset = offset;
+      b.bytes = blocks[index];
       return b;
     }
 
@@ -247,10 +225,6 @@ public final class PagedBytes {
     public int[] getBlockEnds() {
       return blockEnds;
     }
-
-    public void close() {
-      threadBuffers.close();
-    }
   }
 
   /** 1<<blockBits must be bigger than biggest single
@@ -375,6 +349,9 @@ public final class PagedBytes {
   /** Copy bytes in, writing the length as a 1 or 2 byte
    *  vInt prefix. */
   public long copyUsingLengthPrefix(BytesRef bytes) throws IOException {
+    if (bytes.length >= 32768) {
+      throw new IllegalArgumentException("max length is 32767 (got " + bytes.length + ")");
+    }
 
     if (upto + bytes.length + 2 > blockSize) {
       if (bytes.length + 2 > blockSize) {

Modified: lucene/dev/branches/branch_3x/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java?rev=1300596&r1=1300595&r2=1300596&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java (original)
+++ lucene/dev/branches/branch_3x/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java Wed Mar 14 15:43:00 2012
@@ -17,7 +17,9 @@
 
 package org.apache.lucene.util;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.DataOutput;
@@ -26,7 +28,9 @@ public class TestPagedBytes extends Luce
 
   public void testDataInputOutput() throws Exception {
     for(int iter=0;iter<5*RANDOM_MULTIPLIER;iter++) {
-      final PagedBytes p = new PagedBytes(_TestUtil.nextInt(random, 1, 20));
+      final int blockBits = _TestUtil.nextInt(random, 1, 20);
+      final int blockSize = 1 << blockBits;
+      final PagedBytes p = new PagedBytes(blockBits);
       final DataOutput out = p.getDataOutput();
       final int numBytes = random.nextInt(10000000);
 
@@ -43,7 +47,7 @@ public class TestPagedBytes extends Luce
         }
       }
 
-      p.freeze(random.nextBoolean());
+      final PagedBytes.Reader reader = p.freeze(random.nextBoolean());
 
       final DataInput in = p.getDataInput();
 
@@ -59,6 +63,48 @@ public class TestPagedBytes extends Luce
         }
       }
       assertTrue(Arrays.equals(answer, verify));
+
+      final BytesRef slice = new BytesRef();
+      for(int iter2=0;iter2<100;iter2++) {
+        final int pos = random.nextInt(numBytes-1);
+        final int len = random.nextInt(Math.min(blockSize+1, numBytes - pos));
+        reader.fillSlice(slice, pos, len);
+        for(int byteUpto=0;byteUpto<len;byteUpto++) {
+          assertEquals(answer[pos + byteUpto], slice.bytes[slice.offset + byteUpto]);
+        }
+      }
+    }
+  }
+
+  public void testLengthPrefix() throws Exception {
+    for(int iter=0;iter<5*RANDOM_MULTIPLIER;iter++) {
+      final int blockBits = _TestUtil.nextInt(random, 2, 20);
+      final int blockSize = 1 << blockBits;
+      final PagedBytes p = new PagedBytes(blockBits);
+      final List<Integer> addresses = new ArrayList<Integer>();
+      final List<BytesRef> answers = new ArrayList<BytesRef>();
+      int totBytes = 0;
+      while(totBytes < 10000000 && answers.size() < 100000) {
+        final int len = random.nextInt(Math.min(blockSize-2, 32768));
+        final BytesRef b = new BytesRef();
+        b.bytes = new byte[len];
+        b.length = len;
+        b.offset = 0;
+        random.nextBytes(b.bytes);
+        answers.add(b);
+        addresses.add((int) p.copyUsingLengthPrefix(b));
+
+        totBytes += len;
+      }
+
+      final PagedBytes.Reader reader = p.freeze(random.nextBoolean());
+
+      final BytesRef slice = new BytesRef();
+
+      for(int idx=0;idx<answers.size();idx++) {
+        reader.fillSliceWithPrefix(slice, addresses.get(idx));
+        assertEquals(answers.get(idx), slice);
+      }
     }
   }
 }