You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2015/09/01 19:58:30 UTC
drill git commit: DRILL-3684: CTAS : Memory Leak when using CTAS with
tpch sf100
Repository: drill
Updated Branches:
refs/heads/master bcc01c05a -> b0e9e7c08
DRILL-3684: CTAS : Memory Leak when using CTAS with tpch sf100
This closes #141
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b0e9e7c0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b0e9e7c0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b0e9e7c0
Branch: refs/heads/master
Commit: b0e9e7c085a3106a612e400cd1732b6ec6267268
Parents: bcc01c0
Author: adeneche <ad...@gmail.com>
Authored: Mon Aug 31 15:09:29 2015 -0700
Committer: adeneche <ad...@gmail.com>
Committed: Tue Sep 1 10:54:35 2015 -0700
----------------------------------------------------------------------
.../ParquetDirectByteBufferAllocator.java | 70 ++++++++++++++------
1 file changed, 50 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/b0e9e7c0/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
index 10c8fd1..059886d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -22,20 +22,23 @@ import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
import org.apache.drill.exec.ops.OperatorContext;
import parquet.bytes.ByteBufferAllocator;
+/**
+ * {@link ByteBufferAllocator} implementation that uses Drill's {@link BufferAllocator} to allocate and release
+ * {@link ByteBuffer} objects.<br>
+ * To properly release an allocated {@link ByteBuf}, this class keeps track of it's corresponding {@link ByteBuffer}
+ * that was passed to the Parquet library.
+ */
public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
private final BufferAllocator allocator;
- private final HashMap<Integer, ByteBuf> allocatedBuffers = new HashMap<Integer, ByteBuf>();
+ private final HashMap<Key, ByteBuf> allocatedBuffers = new HashMap<>();
public ParquetDirectByteBufferAllocator(OperatorContext o){
allocator = o.getAllocator();
@@ -50,32 +53,59 @@ public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
public ByteBuffer allocate(int sz) {
ByteBuf bb = allocator.buffer(sz);
ByteBuffer b = bb.nioBuffer(0, sz);
- allocatedBuffers.put(System.identityHashCode(b), bb);
- logger.debug("ParquetDirectByteBufferAllocator: Allocated "+sz+" bytes. Allocated ByteBuffer id: "+System.identityHashCode(b));
+ final Key key = new Key(b);
+ allocatedBuffers.put(key, bb);
+ logger.debug("ParquetDirectByteBufferAllocator: Allocated {} bytes. Allocated ByteBuffer id: {}", sz, key.hash);
return b;
}
@Override
public void release(ByteBuffer b) {
- Integer id = System.identityHashCode(b);
- ByteBuf bb = allocatedBuffers.get(id);
+ final Key key = new Key(b);
+ final ByteBuf bb = allocatedBuffers.get(key);
// The ByteBuffer passed in may already have been freed or not allocated by this allocator.
// If it is not found in the allocated buffers, do nothing
- if(bb!=null) {
- logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. Allocated ByteBuffer id: "+System.identityHashCode(b));
+ if(bb != null) {
+ logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. Allocated ByteBuffer id: {}", key.hash);
bb.release();
- allocatedBuffers.remove(id);
+ allocatedBuffers.remove(key);
}
}
- public void clear(){
- Iterator it = allocatedBuffers.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry pair = (Map.Entry)it.next();
- Integer id = (Integer)pair.getKey();
- ByteBuf bb = allocatedBuffers.get(id);
- bb.release();
- it.remove();
+ /**
+ * ByteBuffer wrapper that computes a fixed hashcode.
+ * <br><br>
+ * Parquet only handles {@link ByteBuffer} objects, so we need to use them as keys to keep track of their corresponding
+ * {@link ByteBuf}, but {@link ByteBuffer} is mutable and it can't be used as a {@link HashMap} key as it is.<br>
+ * This class solves this by providing a fixed hashcode for {@link ByteBuffer} and uses reference equality in case
+ * of collisions (we don't need to compare the content of {@link ByteBuffer} because the object passed to
+ * {@link #release(ByteBuffer)} will be the same object returned from a previous {@link #allocate(int)}.
+ */
+ private class Key {
+ final int hash;
+ final ByteBuffer buffer;
+
+ Key(final ByteBuffer buffer) {
+ this.buffer = buffer;
+ // remember, we can't use buffer.hashCode()
+ this.hash = System.identityHashCode(buffer);
+ }
+
+ @Override
+ public int hashCode() {
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Key)) {
+ return false;
+ }
+ final Key key = (Key) obj;
+ return hash == key.hash && buffer == key.buffer;
}
}
}