You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2015/09/01 19:58:30 UTC

drill git commit: DRILL-3684: CTAS : Memory Leak when using CTAS with tpch sf100

Repository: drill
Updated Branches:
  refs/heads/master bcc01c05a -> b0e9e7c08


DRILL-3684: CTAS : Memory Leak when using CTAS with tpch sf100

This closes #141


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b0e9e7c0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b0e9e7c0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b0e9e7c0

Branch: refs/heads/master
Commit: b0e9e7c085a3106a612e400cd1732b6ec6267268
Parents: bcc01c0
Author: adeneche <ad...@gmail.com>
Authored: Mon Aug 31 15:09:29 2015 -0700
Committer: adeneche <ad...@gmail.com>
Committed: Tue Sep 1 10:54:35 2015 -0700

----------------------------------------------------------------------
 .../ParquetDirectByteBufferAllocator.java       | 70 ++++++++++++++------
 1 file changed, 50 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b0e9e7c0/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
index 10c8fd1..059886d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -22,20 +22,23 @@ import io.netty.buffer.ByteBuf;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
 
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.OperatorContext;
 
 import parquet.bytes.ByteBufferAllocator;
 
+/**
+ * {@link ByteBufferAllocator} implementation that uses Drill's {@link BufferAllocator} to allocate and release
+ * {@link ByteBuffer} objects.<br>
+ * To properly release an allocated {@link ByteBuf}, this class keeps track of it's corresponding {@link ByteBuffer}
+ * that was passed to the Parquet library.
+ */
 public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
 
   private final BufferAllocator allocator;
-  private final HashMap<Integer, ByteBuf> allocatedBuffers = new HashMap<Integer, ByteBuf>();
+  private final HashMap<Key, ByteBuf> allocatedBuffers = new HashMap<>();
 
   public ParquetDirectByteBufferAllocator(OperatorContext o){
     allocator = o.getAllocator();
@@ -50,32 +53,59 @@ public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
   public ByteBuffer allocate(int sz) {
     ByteBuf bb = allocator.buffer(sz);
     ByteBuffer b = bb.nioBuffer(0, sz);
-    allocatedBuffers.put(System.identityHashCode(b), bb);
-    logger.debug("ParquetDirectByteBufferAllocator: Allocated "+sz+" bytes. Allocated ByteBuffer id: "+System.identityHashCode(b));
+    final Key key = new Key(b);
+    allocatedBuffers.put(key, bb);
+    logger.debug("ParquetDirectByteBufferAllocator: Allocated {} bytes. Allocated ByteBuffer id: {}", sz, key.hash);
     return b;
   }
 
   @Override
   public void release(ByteBuffer b) {
-    Integer id = System.identityHashCode(b);
-    ByteBuf bb = allocatedBuffers.get(id);
+    final Key key = new Key(b);
+    final ByteBuf bb = allocatedBuffers.get(key);
     // The ByteBuffer passed in may already have been freed or not allocated by this allocator.
     // If it is not found in the allocated buffers, do nothing
-    if(bb!=null) {
-      logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. Allocated ByteBuffer id: "+System.identityHashCode(b));
+    if(bb != null) {
+      logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. Allocated ByteBuffer id: {}", key.hash);
       bb.release();
-      allocatedBuffers.remove(id);
+      allocatedBuffers.remove(key);
     }
   }
 
-  public void clear(){
-    Iterator it = allocatedBuffers.entrySet().iterator();
-    while (it.hasNext()) {
-      Map.Entry pair = (Map.Entry)it.next();
-      Integer id = (Integer)pair.getKey();
-      ByteBuf bb = allocatedBuffers.get(id);
-      bb.release();
-      it.remove();
+  /**
+   * ByteBuffer wrapper that computes a fixed hashcode.
+   * <br><br>
+   * Parquet only handles {@link ByteBuffer} objects, so we need to use them as keys to keep track of their corresponding
+   * {@link ByteBuf}, but {@link ByteBuffer} is mutable and it can't be used as a {@link HashMap} key as it is.<br>
+   * This class solves this by providing a fixed hashcode for {@link ByteBuffer} and uses reference equality in case
+   * of collisions (we don't need to compare the content of {@link ByteBuffer} because the object passed to
+   * {@link #release(ByteBuffer)} will be the same object returned from a previous {@link #allocate(int)}.
+   */
+  private class Key {
+    final int hash;
+    final ByteBuffer buffer;
+
+    Key(final ByteBuffer buffer) {
+      this.buffer = buffer;
+      // remember, we can't use buffer.hashCode()
+      this.hash = System.identityHashCode(buffer);
+    }
+
+    @Override
+    public int hashCode() {
+      return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof Key)) {
+        return false;
+      }
+      final Key key = (Key) obj;
+      return hash == key.hash && buffer == key.buffer;
     }
   }
 }