You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2012/03/14 16:43:01 UTC
svn commit: r1300596 - in /lucene/dev/branches/branch_3x: ./ lucene/
lucene/core/src/java/org/apache/lucene/util/
lucene/core/src/test/org/apache/lucene/util/
Author: mikemccand
Date: Wed Mar 14 15:43:00 2012
New Revision: 1300596
URL: http://svn.apache.org/viewvc?rev=1300596&view=rev
Log:
LUCENE-3841: fix CloseableThreadLocal to also purge entries periodically in get(), to avoid holding onto objects for too long
Modified:
lucene/dev/branches/branch_3x/ (props changed)
lucene/dev/branches/branch_3x/lucene/ (props changed)
lucene/dev/branches/branch_3x/lucene/CHANGES.txt
lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java
lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java
lucene/dev/branches/branch_3x/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java
Modified: lucene/dev/branches/branch_3x/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/CHANGES.txt?rev=1300596&r1=1300595&r2=1300596&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/CHANGES.txt (original)
+++ lucene/dev/branches/branch_3x/lucene/CHANGES.txt Wed Mar 14 15:43:00 2012
@@ -228,6 +228,10 @@ Bug fixes
sloppy query missed documents that exact query matched.
Fixed except when for repeating multiterms (e.g. "yes ho yes|no").
(Robert Muir, Doron Cohen)
+
+* LUCENE-3841: Fix CloseableThreadLocal to also purge stale entries on
+ get(); this fixes certain cases where we were holding onto objects
+ for dead threads for too long (Matthew Bellew, Mike McCandless)
Optimizations
Modified: lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java?rev=1300596&r1=1300595&r2=1300596&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java (original)
+++ lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/CloseableThreadLocal.java Wed Mar 14 15:43:00 2012
@@ -19,9 +19,10 @@ package org.apache.lucene.util;
import java.io.Closeable;
import java.lang.ref.WeakReference;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
/** Java's builtin ThreadLocal has a serious flaw:
* it can take an arbitrarily long amount of time to
@@ -56,8 +57,19 @@ public class CloseableThreadLocal<T> imp
private ThreadLocal<WeakReference<T>> t = new ThreadLocal<WeakReference<T>>();
- private Map<Thread,T> hardRefs = new HashMap<Thread,T>();
+ // Use a WeakHashMap so that if a Thread exits and is
+ // GC'able, its entry may be removed:
+ private Map<Thread,T> hardRefs = new WeakHashMap<Thread,T>();
+ // Increase this to decrease frequency of purging in get:
+ private static int PURGE_MULTIPLIER = 20;
+
+ // On each get or set we decrement this; when it hits 0 we
+ // purge. After purge, we set this to
+ // PURGE_MULTIPLIER * stillAliveCount. This keeps
+ // amortized cost of purging linear.
+ private final AtomicInteger countUntilPurge = new AtomicInteger(PURGE_MULTIPLIER);
+
protected T initialValue() {
return null;
}
@@ -69,9 +81,11 @@ public class CloseableThreadLocal<T> imp
if (iv != null) {
set(iv);
return iv;
- } else
+ } else {
return null;
+ }
} else {
+ maybePurge();
return weakRef.get();
}
}
@@ -82,13 +96,35 @@ public class CloseableThreadLocal<T> imp
synchronized(hardRefs) {
hardRefs.put(Thread.currentThread(), object);
+ maybePurge();
+ }
+ }
- // Purge dead threads
+ private void maybePurge() {
+ if (countUntilPurge.getAndDecrement() == 0) {
+ purge();
+ }
+ }
+
+ // Purge dead threads
+ private void purge() {
+ synchronized(hardRefs) {
+ int stillAliveCount = 0;
for (Iterator<Thread> it = hardRefs.keySet().iterator(); it.hasNext();) {
final Thread t = it.next();
- if (!t.isAlive())
+ if (!t.isAlive()) {
it.remove();
+ } else {
+ stillAliveCount++;
+ }
+ }
+ int nextCount = (1+stillAliveCount) * PURGE_MULTIPLIER;
+ if (nextCount <= 0) {
+ // defensive: int overflow!
+ nextCount = 1000000;
}
+
+ countUntilPurge.set(nextCount);
}
}
Modified: lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java?rev=1300596&r1=1300595&r2=1300596&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java (original)
+++ lucene/dev/branches/branch_3x/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java Wed Mar 14 15:43:00 2012
@@ -17,7 +17,6 @@ package org.apache.lucene.util;
* limitations under the License.
*/
-import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -46,13 +45,12 @@ public final class PagedBytes {
private static final byte[] EMPTY_BYTES = new byte[0];
- public final static class Reader implements Closeable {
+ public final static class Reader {
private final byte[][] blocks;
private final int[] blockEnds;
private final int blockBits;
private final int blockMask;
private final int blockSize;
- private final CloseableThreadLocal<byte[]> threadBuffers = new CloseableThreadLocal<byte[]>();
public Reader(PagedBytes pagedBytes) {
blocks = new byte[pagedBytes.blocks.size()][];
@@ -79,6 +77,7 @@ public final class PagedBytes {
**/
public BytesRef fillSlice(BytesRef b, long start, int length) {
assert length >= 0: "length=" + length;
+ assert length <= blockSize+1;
final int index = (int) (start >> blockBits);
final int offset = (int) (start & blockMask);
b.length = length;
@@ -88,18 +87,10 @@ public final class PagedBytes {
b.offset = offset;
} else {
// Split
- byte[] buffer = threadBuffers.get();
- if (buffer == null) {
- buffer = new byte[length];
- threadBuffers.set(buffer);
- } else if (buffer.length < length) {
- buffer = ArrayUtil.grow(buffer, length);
- threadBuffers.set(buffer);
- }
- b.bytes = buffer;
+ b.bytes = new byte[length];
b.offset = 0;
- System.arraycopy(blocks[index], offset, buffer, 0, blockSize-offset);
- System.arraycopy(blocks[1+index], 0, buffer, blockSize-offset, length-(blockSize-offset));
+ System.arraycopy(blocks[index], offset, b.bytes, 0, blockSize-offset);
+ System.arraycopy(blocks[1+index], 0, b.bytes, blockSize-offset, length-(blockSize-offset));
}
return b;
}
@@ -216,25 +207,12 @@ public final class PagedBytes {
}
assert length >= 0: "length=" + length;
b.length = length;
- if (blockSize - offset >= length) {
- // Within block
- b.offset = offset;
- b.bytes = blocks[index];
- } else {
- // Split
- byte[] buffer = threadBuffers.get();
- if (buffer == null) {
- buffer = new byte[length];
- threadBuffers.set(buffer);
- } else if (buffer.length < length) {
- buffer = ArrayUtil.grow(buffer, length);
- threadBuffers.set(buffer);
- }
- b.bytes = buffer;
- b.offset = 0;
- System.arraycopy(blocks[index], offset, buffer, 0, blockSize-offset);
- System.arraycopy(blocks[1+index], 0, buffer, blockSize-offset, length-(blockSize-offset));
- }
+ // We always alloc a new block when writing w/ prefix;
+ // we could some day relax that and span two blocks:
+ assert blockSize - offset >= length;
+ // Within block
+ b.offset = offset;
+ b.bytes = blocks[index];
return b;
}
@@ -247,10 +225,6 @@ public final class PagedBytes {
public int[] getBlockEnds() {
return blockEnds;
}
-
- public void close() {
- threadBuffers.close();
- }
}
/** 1<<blockBits must be bigger than biggest single
@@ -375,6 +349,9 @@ public final class PagedBytes {
/** Copy bytes in, writing the length as a 1 or 2 byte
* vInt prefix. */
public long copyUsingLengthPrefix(BytesRef bytes) throws IOException {
+ if (bytes.length >= 32768) {
+ throw new IllegalArgumentException("max length is 32767 (got " + bytes.length + ")");
+ }
if (upto + bytes.length + 2 > blockSize) {
if (bytes.length + 2 > blockSize) {
Modified: lucene/dev/branches/branch_3x/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java?rev=1300596&r1=1300595&r2=1300596&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java (original)
+++ lucene/dev/branches/branch_3x/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java Wed Mar 14 15:43:00 2012
@@ -17,7 +17,9 @@
package org.apache.lucene.util;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
@@ -26,7 +28,9 @@ public class TestPagedBytes extends Luce
public void testDataInputOutput() throws Exception {
for(int iter=0;iter<5*RANDOM_MULTIPLIER;iter++) {
- final PagedBytes p = new PagedBytes(_TestUtil.nextInt(random, 1, 20));
+ final int blockBits = _TestUtil.nextInt(random, 1, 20);
+ final int blockSize = 1 << blockBits;
+ final PagedBytes p = new PagedBytes(blockBits);
final DataOutput out = p.getDataOutput();
final int numBytes = random.nextInt(10000000);
@@ -43,7 +47,7 @@ public class TestPagedBytes extends Luce
}
}
- p.freeze(random.nextBoolean());
+ final PagedBytes.Reader reader = p.freeze(random.nextBoolean());
final DataInput in = p.getDataInput();
@@ -59,6 +63,48 @@ public class TestPagedBytes extends Luce
}
}
assertTrue(Arrays.equals(answer, verify));
+
+ final BytesRef slice = new BytesRef();
+ for(int iter2=0;iter2<100;iter2++) {
+ final int pos = random.nextInt(numBytes-1);
+ final int len = random.nextInt(Math.min(blockSize+1, numBytes - pos));
+ reader.fillSlice(slice, pos, len);
+ for(int byteUpto=0;byteUpto<len;byteUpto++) {
+ assertEquals(answer[pos + byteUpto], slice.bytes[slice.offset + byteUpto]);
+ }
+ }
+ }
+ }
+
+ public void testLengthPrefix() throws Exception {
+ for(int iter=0;iter<5*RANDOM_MULTIPLIER;iter++) {
+ final int blockBits = _TestUtil.nextInt(random, 2, 20);
+ final int blockSize = 1 << blockBits;
+ final PagedBytes p = new PagedBytes(blockBits);
+ final List<Integer> addresses = new ArrayList<Integer>();
+ final List<BytesRef> answers = new ArrayList<BytesRef>();
+ int totBytes = 0;
+ while(totBytes < 10000000 && answers.size() < 100000) {
+ final int len = random.nextInt(Math.min(blockSize-2, 32768));
+ final BytesRef b = new BytesRef();
+ b.bytes = new byte[len];
+ b.length = len;
+ b.offset = 0;
+ random.nextBytes(b.bytes);
+ answers.add(b);
+ addresses.add((int) p.copyUsingLengthPrefix(b));
+
+ totBytes += len;
+ }
+
+ final PagedBytes.Reader reader = p.freeze(random.nextBoolean());
+
+ final BytesRef slice = new BytesRef();
+
+ for(int idx=0;idx<answers.size();idx++) {
+ reader.fillSliceWithPrefix(slice, addresses.get(idx));
+ assertEquals(answers.get(idx), slice);
+ }
}
}
}