You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/02 05:03:13 UTC

[13/13] git commit: Merge Jason's SQL updates to work with full exec. Random vector updates including changing to copyFrom Fixes to writable batch. Add an alternative ByteBuf implementation that leverages little endianness.

Merge Jason's SQL updates to work with full exec.
Random vector updates including changing to copyFrom
Fixes to writable batch.
Add an alternative ByteBuf implementation that leverages little endianness.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/103072a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/103072a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/103072a6

Branch: refs/heads/master
Commit: 103072a619741d5e228fdb181501ec2f82e111a3
Parents: 4e289f0
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed Jul 31 20:00:14 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 1 19:51:35 2013 -0700

----------------------------------------------------------------------
 sandbox/prototype/exec/bufferl/pom.xml          |  35 ++
 .../main/java/io/netty/buffer/PoolArenaL.java   | 425 +++++++++++++++++++
 .../main/java/io/netty/buffer/PoolChunkL.java   | 348 +++++++++++++++
 .../java/io/netty/buffer/PoolChunkListL.java    | 129 ++++++
 .../main/java/io/netty/buffer/PoolSubpageL.java | 195 +++++++++
 .../java/io/netty/buffer/PoolThreadCacheL.java  |  33 ++
 .../netty/buffer/PooledByteBufAllocatorL.java   | 263 ++++++++++++
 .../java/io/netty/buffer/PooledByteBufL.java    | 173 ++++++++
 .../buffer/PooledUnsafeDirectByteBufL.java      | 342 +++++++++++++++
 sandbox/prototype/exec/java-exec/pom.xml        |   5 +
 .../templates/FixedValueVectors.java            |   4 +-
 .../templates/NullableValueVectors.java         |   6 +-
 .../templates/RepeatedValueVectors.java         |   2 +-
 .../templates/VariableLengthVectors.java        |   2 +-
 .../apache/drill/exec/client/DrillClient.java   |  39 +-
 .../expr/fn/FunctionImplementationRegistry.java |   4 +-
 .../exec/memory/DirectBufferAllocator.java      |  21 +-
 .../apache/drill/exec/opt/BasicOptimizer.java   |  47 +-
 .../physical/impl/filter/FilterTemplate.java    |  12 +-
 .../materialize/VectorRecordMaterializer.java   |   1 +
 .../impl/project/ProjectorTemplate.java         |   4 +-
 .../physical/impl/svremover/CopierTemplate.java |   2 +-
 .../impl/svremover/RemovingRecordBatch.java     |  12 +-
 .../apache/drill/exec/record/WritableBatch.java |   5 +
 .../exec/record/selection/SelectionVector2.java |   8 +-
 .../apache/drill/exec/rpc/user/UserClient.java  |   8 +-
 .../org/apache/drill/exec/vector/BitVector.java |   4 +-
 .../apache/drill/exec/work/foreman/Foreman.java |   3 +
 .../apache/drill/exec/memory/TestEndianess.java |  24 ++
 sandbox/prototype/exec/pom.xml                  |   1 +
 sandbox/prototype/sqlparser/pom.xml             |  23 +-
 .../java/org/apache/drill/jdbc/DrillTable.java  |  82 +++-
 .../org/apache/drill/optiq/EnumerableDrill.java | 250 -----------
 .../drill/optiq/EnumerableDrillFullEngine.java  |  85 ++++
 .../apache/drill/optiq/EnumerableDrillRel.java  |  20 +-
 .../apache/drill/optiq/EnumerableDrillRule.java |  18 +-
 .../drill/sql/client/full/BatchListener.java    |  48 +++
 .../drill/sql/client/full/BatchLoaderMap.java   | 185 ++++++++
 .../drill/sql/client/full/DrillFullImpl.java    |  64 +++
 .../drill/sql/client/full/ResultEnumerator.java |  48 +++
 .../drill/sql/client/ref/DrillRefImpl.java      | 240 +++++++++++
 .../org/apache/drill/jdbc/test/JdbcAssert.java  |  49 ++-
 .../org/apache/drill/jdbc/test/JdbcTest.java    | 168 +++++---
 43 files changed, 3024 insertions(+), 413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/pom.xml b/sandbox/prototype/exec/bufferl/pom.xml
new file mode 100644
index 0000000..baa2c3d
--- /dev/null
+++ b/sandbox/prototype/exec/bufferl/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2012 The Netty Project
+  ~
+  ~ The Netty Project licenses this file to you under the Apache License,
+  ~ version 2.0 (the "License"); you may not use this file except in compliance
+  ~ with the License. You may obtain a copy of the License at:
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  ~ License for the specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.drill.exec</groupId>
+  <version>4.0.3.Final</version>
+  <artifactId>netty-bufferl</artifactId>
+
+  <name>Netty/Drill/Buffer</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+  
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
new file mode 100644
index 0000000..db9818d
--- /dev/null
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer;
+
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.StringUtil;
+
+import java.nio.ByteBuffer;
+
+abstract class PoolArenaL<T> {
+
+    final PooledByteBufAllocatorL parent;
+
+    private final int pageSize;
+    private final int maxOrder;
+    private final int pageShifts;
+    private final int chunkSize;
+    private final int subpageOverflowMask;
+
+    private final PoolSubpageL<T>[] tinySubpagePools;
+    private final PoolSubpageL<T>[] smallSubpagePools;
+
+    private final PoolChunkListL<T> q050;
+    private final PoolChunkListL<T> q025;
+    private final PoolChunkListL<T> q000;
+    private final PoolChunkListL<T> qInit;
+    private final PoolChunkListL<T> q075;
+    private final PoolChunkListL<T> q100;
+
+    // TODO: Test if adding padding helps under contention
+    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
+
+    protected PoolArenaL(PooledByteBufAllocatorL parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
+        this.parent = parent;
+        this.pageSize = pageSize;
+        this.maxOrder = maxOrder;
+        this.pageShifts = pageShifts;
+        this.chunkSize = chunkSize;
+        subpageOverflowMask = ~(pageSize - 1);
+
+        tinySubpagePools = newSubpagePoolArray(512 >>> 4);
+        for (int i = 0; i < tinySubpagePools.length; i ++) {
+            tinySubpagePools[i] = newSubpagePoolHead(pageSize);
+        }
+
+        smallSubpagePools = newSubpagePoolArray(pageShifts - 9);
+        for (int i = 0; i < smallSubpagePools.length; i ++) {
+            smallSubpagePools[i] = newSubpagePoolHead(pageSize);
+        }
+
+        q100 = new PoolChunkListL<T>(this, null, 100, Integer.MAX_VALUE);
+        q075 = new PoolChunkListL<T>(this, q100, 75, 100);
+        q050 = new PoolChunkListL<T>(this, q075, 50, 100);
+        q025 = new PoolChunkListL<T>(this, q050, 25, 75);
+        q000 = new PoolChunkListL<T>(this, q025, 1, 50);
+        qInit = new PoolChunkListL<T>(this, q000, Integer.MIN_VALUE, 25);
+
+        q100.prevList = q075;
+        q075.prevList = q050;
+        q050.prevList = q025;
+        q025.prevList = q000;
+        q000.prevList = null;
+        qInit.prevList = qInit;
+    }
+
+    private PoolSubpageL<T> newSubpagePoolHead(int pageSize) {
+        PoolSubpageL<T> head = new PoolSubpageL<T>(pageSize);
+        head.prev = head;
+        head.next = head;
+        return head;
+    }
+
+    @SuppressWarnings("unchecked")
+    private PoolSubpageL<T>[] newSubpagePoolArray(int size) {
+        return new PoolSubpageL[size];
+    }
+
+    PooledByteBufL<T> allocate(PoolThreadCacheL cache, int reqCapacity, int maxCapacity) {
+        PooledByteBufL<T> buf = newByteBuf(maxCapacity);
+        allocate(cache, buf, reqCapacity);
+        return buf;
+    }
+
+    private void allocate(PoolThreadCacheL cache, PooledByteBufL<T> buf, final int reqCapacity) {
+        final int normCapacity = normalizeCapacity(reqCapacity);
+        if ((normCapacity & subpageOverflowMask) == 0) { // capacity < pageSize
+            int tableIdx;
+            PoolSubpageL<T>[] table;
+            if ((normCapacity & 0xFFFFFE00) == 0) { // < 512
+                tableIdx = normCapacity >>> 4;
+                table = tinySubpagePools;
+            } else {
+                tableIdx = 0;
+                int i = normCapacity >>> 10;
+                while (i != 0) {
+                    i >>>= 1;
+                    tableIdx ++;
+                }
+                table = smallSubpagePools;
+            }
+
+            synchronized (this) {
+                final PoolSubpageL<T> head = table[tableIdx];
+                final PoolSubpageL<T> s = head.next;
+                if (s != head) {
+                    assert s.doNotDestroy && s.elemSize == normCapacity;
+                    long handle = s.allocate();
+                    assert handle >= 0;
+                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
+                    return;
+                }
+            }
+        } else if (normCapacity > chunkSize) {
+            allocateHuge(buf, reqCapacity);
+            return;
+        }
+
+        allocateNormal(buf, reqCapacity, normCapacity);
+    }
+
+    private synchronized void allocateNormal(PooledByteBufL<T> buf, int reqCapacity, int normCapacity) {
+        if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
+            q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
+            q075.allocate(buf, reqCapacity, normCapacity) || q100.allocate(buf, reqCapacity, normCapacity)) {
+            return;
+        }
+
+        // Add a new chunk.
+        PoolChunkL<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
+        long handle = c.allocate(normCapacity);
+        assert handle > 0;
+        c.initBuf(buf, handle, reqCapacity);
+        qInit.add(c);
+    }
+
+    private void allocateHuge(PooledByteBufL<T> buf, int reqCapacity) {
+        buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
+    }
+
+    synchronized void free(PoolChunkL<T> chunk, long handle) {
+        if (chunk.unpooled) {
+            destroyChunk(chunk);
+        } else {
+            chunk.parent.free(chunk, handle);
+        }
+    }
+
+    PoolSubpageL<T> findSubpagePoolHead(int elemSize) {
+        int tableIdx;
+        PoolSubpageL<T>[] table;
+        if ((elemSize & 0xFFFFFE00) == 0) { // < 512
+            tableIdx = elemSize >>> 4;
+            table = tinySubpagePools;
+        } else {
+            tableIdx = 0;
+            elemSize >>>= 10;
+            while (elemSize != 0) {
+                elemSize >>>= 1;
+                tableIdx ++;
+            }
+            table = smallSubpagePools;
+        }
+
+        return table[tableIdx];
+    }
+
+    private int normalizeCapacity(int reqCapacity) {
+        if (reqCapacity < 0) {
+            throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
+        }
+        if (reqCapacity >= chunkSize) {
+            return reqCapacity;
+        }
+
+        if ((reqCapacity & 0xFFFFFE00) != 0) { // >= 512
+            // Doubled
+
+            int normalizedCapacity = reqCapacity;
+            normalizedCapacity |= normalizedCapacity >>>  1;
+            normalizedCapacity |= normalizedCapacity >>>  2;
+            normalizedCapacity |= normalizedCapacity >>>  4;
+            normalizedCapacity |= normalizedCapacity >>>  8;
+            normalizedCapacity |= normalizedCapacity >>> 16;
+            normalizedCapacity ++;
+
+            if (normalizedCapacity < 0) {
+                normalizedCapacity >>>= 1;
+            }
+
+            return normalizedCapacity;
+        }
+
+        // Quantum-spaced
+        if ((reqCapacity & 15) == 0) {
+            return reqCapacity;
+        }
+
+        return (reqCapacity & ~15) + 16;
+    }
+
+    void reallocate(PooledByteBufL<T> buf, int newCapacity, boolean freeOldMemory) {
+        if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
+            throw new IllegalArgumentException("newCapacity: " + newCapacity);
+        }
+
+        int oldCapacity = buf.length;
+        if (oldCapacity == newCapacity) {
+            return;
+        }
+
+        PoolChunkL<T> oldChunk = buf.chunk;
+        long oldHandle = buf.handle;
+        T oldMemory = buf.memory;
+        int oldOffset = buf.offset;
+
+        int readerIndex = buf.readerIndex();
+        int writerIndex = buf.writerIndex();
+
+        allocate(parent.threadCache.get(), buf, newCapacity);
+        if (newCapacity > oldCapacity) {
+            memoryCopy(
+                    oldMemory, oldOffset + readerIndex,
+                    buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
+        } else if (newCapacity < oldCapacity) {
+            if (readerIndex < newCapacity) {
+                if (writerIndex > newCapacity) {
+                    writerIndex = newCapacity;
+                }
+                memoryCopy(
+                        oldMemory, oldOffset + readerIndex,
+                        buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
+            } else {
+                readerIndex = writerIndex = newCapacity;
+            }
+        }
+
+        buf.setIndex(readerIndex, writerIndex);
+
+        if (freeOldMemory) {
+            free(oldChunk, oldHandle);
+        }
+    }
+
+    protected abstract PoolChunkL<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
+    protected abstract PoolChunkL<T> newUnpooledChunk(int capacity);
+    protected abstract PooledByteBufL<T> newByteBuf(int maxCapacity);
+    protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length);
+    protected abstract void destroyChunk(PoolChunkL<T> chunk);
+
+    public synchronized String toString() {
+        StringBuilder buf = new StringBuilder();
+        buf.append("Chunk(s) at 0~25%:");
+        buf.append(StringUtil.NEWLINE);
+        buf.append(qInit);
+        buf.append(StringUtil.NEWLINE);
+        buf.append("Chunk(s) at 0~50%:");
+        buf.append(StringUtil.NEWLINE);
+        buf.append(q000);
+        buf.append(StringUtil.NEWLINE);
+        buf.append("Chunk(s) at 25~75%:");
+        buf.append(StringUtil.NEWLINE);
+        buf.append(q025);
+        buf.append(StringUtil.NEWLINE);
+        buf.append("Chunk(s) at 50~100%:");
+        buf.append(StringUtil.NEWLINE);
+        buf.append(q050);
+        buf.append(StringUtil.NEWLINE);
+        buf.append("Chunk(s) at 75~100%:");
+        buf.append(StringUtil.NEWLINE);
+        buf.append(q075);
+        buf.append(StringUtil.NEWLINE);
+        buf.append("Chunk(s) at 100%:");
+        buf.append(StringUtil.NEWLINE);
+        buf.append(q100);
+        buf.append(StringUtil.NEWLINE);
+        buf.append("tiny subpages:");
+        for (int i = 1; i < tinySubpagePools.length; i ++) {
+            PoolSubpageL<T> head = tinySubpagePools[i];
+            if (head.next == head) {
+                continue;
+            }
+
+            buf.append(StringUtil.NEWLINE);
+            buf.append(i);
+            buf.append(": ");
+            PoolSubpageL<T> s = head.next;
+            for (;;) {
+                buf.append(s);
+                s = s.next;
+                if (s == head) {
+                    break;
+                }
+            }
+        }
+        buf.append(StringUtil.NEWLINE);
+        buf.append("small subpages:");
+        for (int i = 1; i < smallSubpagePools.length; i ++) {
+            PoolSubpageL<T> head = smallSubpagePools[i];
+            if (head.next == head) {
+                continue;
+            }
+
+            buf.append(StringUtil.NEWLINE);
+            buf.append(i);
+            buf.append(": ");
+            PoolSubpageL<T> s = head.next;
+            for (;;) {
+                buf.append(s);
+                s = s.next;
+                if (s == head) {
+                    break;
+                }
+            }
+        }
+        buf.append(StringUtil.NEWLINE);
+
+        return buf.toString();
+    }
+
+    static final class HeapArena extends PoolArenaL<byte[]> {
+
+        HeapArena(PooledByteBufAllocatorL parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
+            super(parent, pageSize, maxOrder, pageShifts, chunkSize);
+        }
+
+        @Override
+        protected PoolChunkL<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
+            return new PoolChunkL<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize);
+        }
+
+        @Override
+        protected PoolChunkL<byte[]> newUnpooledChunk(int capacity) {
+            return new PoolChunkL<byte[]>(this, new byte[capacity], capacity);
+        }
+
+        @Override
+        protected void destroyChunk(PoolChunkL<byte[]> chunk) {
+            // Rely on GC.
+        }
+
+        @Override
+        protected PooledByteBufL<byte[]> newByteBuf(int maxCapacity) {
+          throw new UnsupportedOperationException();
+//            return PooledHeapByteBufL.newInstance(maxCapacity);
+        }
+
+        @Override
+        protected void memoryCopy(byte[] src, int srcOffset, byte[] dst, int dstOffset, int length) {
+            if (length == 0) {
+                return;
+            }
+
+            System.arraycopy(src, srcOffset, dst, dstOffset, length);
+        }
+    }
+
+    static final class DirectArena extends PoolArenaL<ByteBuffer> {
+
+        private static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
+
+        DirectArena(PooledByteBufAllocatorL parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
+            super(parent, pageSize, maxOrder, pageShifts, chunkSize);
+        }
+
+        @Override
+        protected PoolChunkL<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
+            return new PoolChunkL<ByteBuffer>(
+                    this, ByteBuffer.allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize);
+        }
+
+        @Override
+        protected PoolChunkL<ByteBuffer> newUnpooledChunk(int capacity) {
+            return new PoolChunkL<ByteBuffer>(this, ByteBuffer.allocateDirect(capacity), capacity);
+        }
+
+        @Override
+        protected void destroyChunk(PoolChunkL<ByteBuffer> chunk) {
+            PlatformDependent.freeDirectBuffer(chunk.memory);
+        }
+
+        @Override
+        protected PooledByteBufL<ByteBuffer> newByteBuf(int maxCapacity) {
+            if (HAS_UNSAFE) {
+                return PooledUnsafeDirectByteBufL.newInstance(maxCapacity);
+            } else {
+              throw new UnsupportedOperationException();
+//                return PooledDirectByteBufL.newInstance(maxCapacity);
+            }
+        }
+
+        @Override
+        protected void memoryCopy(ByteBuffer src, int srcOffset, ByteBuffer dst, int dstOffset, int length) {
+            if (length == 0) {
+                return;
+            }
+
+            if (HAS_UNSAFE) {
+                PlatformDependent.copyMemory(
+                        PlatformDependent.directBufferAddress(src) + srcOffset,
+                        PlatformDependent.directBufferAddress(dst) + dstOffset, length);
+            } else {
+                // We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
+                src = src.duplicate();
+                dst = dst.duplicate();
+                src.position(srcOffset).limit(srcOffset + length);
+                dst.position(dstOffset);
+                dst.put(src);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkL.java
new file mode 100644
index 0000000..8e8f32b
--- /dev/null
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkL.java
@@ -0,0 +1,348 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer;
+
+final class PoolChunkL<T> {
+    private static final int ST_UNUSED = 0;
+    private static final int ST_BRANCH = 1;
+    private static final int ST_ALLOCATED = 2;
+    private static final int ST_ALLOCATED_SUBPAGE = ST_ALLOCATED | 1;
+
+    private static final long multiplier = 0x5DEECE66DL;
+    private static final long addend = 0xBL;
+    private static final long mask = (1L << 48) - 1;
+
+    final PoolArenaL<T> arena;
+    final T memory;
+    final boolean unpooled;
+
+    private final int[] memoryMap;
+    private final PoolSubpageL<T>[] subpages;
+    /** Used to determine if the requested capacity is equal to or greater than pageSize. */
+    private final int subpageOverflowMask;
+    private final int pageSize;
+    private final int pageShifts;
+
+    private final int chunkSize;
+    private final int maxSubpageAllocs;
+
+    private long random = (System.nanoTime() ^ multiplier) & mask;
+
+    private int freeBytes;
+
+    PoolChunkListL<T> parent;
+    PoolChunkL<T> prev;
+    PoolChunkL<T> next;
+
+    // TODO: Test if adding padding helps under contention
+    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
+
+    PoolChunkL(PoolArenaL<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
+        unpooled = false;
+        this.arena = arena;
+        this.memory = memory;
+        this.pageSize = pageSize;
+        this.pageShifts = pageShifts;
+        this.chunkSize = chunkSize;
+        subpageOverflowMask = ~(pageSize - 1);
+        freeBytes = chunkSize;
+
+        int chunkSizeInPages = chunkSize >>> pageShifts;
+        maxSubpageAllocs = 1 << maxOrder;
+
+        // Generate the memory map.
+        memoryMap = new int[maxSubpageAllocs << 1];
+        int memoryMapIndex = 1;
+        for (int i = 0; i <= maxOrder; i ++) {
+            int runSizeInPages = chunkSizeInPages >>> i;
+            for (int j = 0; j < chunkSizeInPages; j += runSizeInPages) {
+                //noinspection PointlessBitwiseExpression
+                memoryMap[memoryMapIndex ++] = j << 17 | runSizeInPages << 2 | ST_UNUSED;
+            }
+        }
+
+        subpages = newSubpageArray(maxSubpageAllocs);
+    }
+
+    /** Creates a special chunk that is not pooled. */
+    PoolChunkL(PoolArenaL<T> arena, T memory, int size) {
+        unpooled = true;
+        this.arena = arena;
+        this.memory = memory;
+        memoryMap = null;
+        subpages = null;
+        subpageOverflowMask = 0;
+        pageSize = 0;
+        pageShifts = 0;
+        chunkSize = size;
+        maxSubpageAllocs = 0;
+    }
+
+    @SuppressWarnings("unchecked")
+    private PoolSubpageL<T>[] newSubpageArray(int size) {
+        return new PoolSubpageL[size];
+    }
+
+    int usage() {
+        if (freeBytes == 0) {
+            return 100;
+        }
+
+        int freePercentage = (int) (freeBytes * 100L / chunkSize);
+        if (freePercentage == 0) {
+            return 99;
+        }
+        return 100 - freePercentage;
+    }
+
+    long allocate(int normCapacity) {
+        int firstVal = memoryMap[1];
+        if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
+            return allocateRun(normCapacity, 1, firstVal);
+        } else {
+            return allocateSubpage(normCapacity, 1, firstVal);
+        }
+    }
+
+    private long allocateRun(int normCapacity, int curIdx, int val) {
+        for (;;) {
+            if ((val & ST_ALLOCATED) != 0) { // state == ST_ALLOCATED || state == ST_ALLOCATED_SUBPAGE
+                return -1;
+            }
+
+            if ((val & ST_BRANCH) != 0) { // state == ST_BRANCH
+                int nextIdx = curIdx << 1 ^ nextRandom();
+                long res = allocateRun(normCapacity, nextIdx, memoryMap[nextIdx]);
+                if (res > 0) {
+                    return res;
+                }
+
+                curIdx = nextIdx ^ 1;
+                val = memoryMap[curIdx];
+                continue;
+            }
+
+            // state == ST_UNUSED
+            return allocateRunSimple(normCapacity, curIdx, val);
+        }
+    }
+
+    private long allocateRunSimple(int normCapacity, int curIdx, int val) {
+        int runLength = runLength(val);
+        if (normCapacity > runLength) {
+            return -1;
+        }
+
+        for (;;) {
+            if (normCapacity == runLength) {
+                // Found the run that fits.
+                // Note that capacity has been normalized already, so we don't need to deal with
+                // the values that are not power of 2.
+                memoryMap[curIdx] = val & ~3 | ST_ALLOCATED;
+                freeBytes -= runLength;
+                return curIdx;
+            }
+
+            int nextIdx = curIdx << 1 ^ nextRandom();
+            int unusedIdx = nextIdx ^ 1;
+
+            memoryMap[curIdx] = val & ~3 | ST_BRANCH;
+            //noinspection PointlessBitwiseExpression
+            memoryMap[unusedIdx] = memoryMap[unusedIdx] & ~3 | ST_UNUSED;
+
+            runLength >>>= 1;
+            curIdx = nextIdx;
+            val = memoryMap[curIdx];
+        }
+    }
+
+    private long allocateSubpage(int normCapacity, int curIdx, int val) {
+        int state = val & 3;
+        if (state == ST_BRANCH) {
+            int nextIdx = curIdx << 1 ^ nextRandom();
+            long res = branchSubpage(normCapacity, nextIdx);
+            if (res > 0) {
+                return res;
+            }
+
+            return branchSubpage(normCapacity, nextIdx ^ 1);
+        }
+
+        if (state == ST_UNUSED) {
+            return allocateSubpageSimple(normCapacity, curIdx, val);
+        }
+
+        if (state == ST_ALLOCATED_SUBPAGE) {
+            PoolSubpageL<T> subpage = subpages[subpageIdx(curIdx)];
+            int elemSize = subpage.elemSize;
+            if (normCapacity != elemSize) {
+                return -1;
+            }
+
+            return subpage.allocate();
+        }
+
+        return -1;
+    }
+
+    private long allocateSubpageSimple(int normCapacity, int curIdx, int val) {
+        int runLength = runLength(val);
+        for (;;) {
+            if (runLength == pageSize) {
+                memoryMap[curIdx] = val & ~3 | ST_ALLOCATED_SUBPAGE;
+                freeBytes -= runLength;
+
+                int subpageIdx = subpageIdx(curIdx);
+                PoolSubpageL<T> subpage = subpages[subpageIdx];
+                if (subpage == null) {
+                    subpage = new PoolSubpageL<T>(this, curIdx, runOffset(val), pageSize, normCapacity);
+                    subpages[subpageIdx] = subpage;
+                } else {
+                    subpage.init(normCapacity);
+                }
+                return subpage.allocate();
+            }
+
+            int nextIdx = curIdx << 1 ^ nextRandom();
+            int unusedIdx = nextIdx ^ 1;
+
+            memoryMap[curIdx] = val & ~3 | ST_BRANCH;
+            //noinspection PointlessBitwiseExpression
+            memoryMap[unusedIdx] = memoryMap[unusedIdx] & ~3 | ST_UNUSED;
+
+            runLength >>>= 1;
+            curIdx = nextIdx;
+            val = memoryMap[curIdx];
+        }
+    }
+
+    private long branchSubpage(int normCapacity, int nextIdx) {
+        int nextVal = memoryMap[nextIdx];
+        if ((nextVal & 3) != ST_ALLOCATED) {
+            return allocateSubpage(normCapacity, nextIdx, nextVal);
+        }
+        return -1;
+    }
+
+    void free(long handle) {
+        int memoryMapIdx = (int) handle;
+        int bitmapIdx = (int) (handle >>> 32);
+
+        int val = memoryMap[memoryMapIdx];
+        int state = val & 3;
+        if (state == ST_ALLOCATED_SUBPAGE) {
+            assert bitmapIdx != 0;
+            PoolSubpageL<T> subpage = subpages[subpageIdx(memoryMapIdx)];
+            assert subpage != null && subpage.doNotDestroy;
+            if (subpage.free(bitmapIdx & 0x3FFFFFFF)) {
+                return;
+            }
+        } else {
+            assert state == ST_ALLOCATED : "state: " + state;
+            assert bitmapIdx == 0;
+        }
+
+        freeBytes += runLength(val);
+
+        for (;;) {
+            //noinspection PointlessBitwiseExpression
+            memoryMap[memoryMapIdx] = val & ~3 | ST_UNUSED;
+            if (memoryMapIdx == 1) {
+                assert freeBytes == chunkSize;
+                return;
+            }
+
+            if ((memoryMap[siblingIdx(memoryMapIdx)] & 3) != ST_UNUSED) {
+                break;
+            }
+
+            memoryMapIdx = parentIdx(memoryMapIdx);
+            val = memoryMap[memoryMapIdx];
+        }
+    }
+
+    void initBuf(PooledByteBufL<T> buf, long handle, int reqCapacity) {
+        int memoryMapIdx = (int) handle;
+        int bitmapIdx = (int) (handle >>> 32);
+        if (bitmapIdx == 0) {
+            int val = memoryMap[memoryMapIdx];
+            assert (val & 3) == ST_ALLOCATED : String.valueOf(val & 3);
+            buf.init(this, handle, runOffset(val), reqCapacity, runLength(val));
+        } else {
+            initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
+        }
+    }
+
+    void initBufWithSubpage(PooledByteBufL<T> buf, long handle, int reqCapacity) {
+        initBufWithSubpage(buf, handle, (int) (handle >>> 32), reqCapacity);
+    }
+
+    private void initBufWithSubpage(PooledByteBufL<T> buf, long handle, int bitmapIdx, int reqCapacity) {
+        assert bitmapIdx != 0;
+
+        int memoryMapIdx = (int) handle;
+        int val = memoryMap[memoryMapIdx];
+        assert (val & 3) == ST_ALLOCATED_SUBPAGE;
+
+        PoolSubpageL<T> subpage = subpages[subpageIdx(memoryMapIdx)];
+        assert subpage.doNotDestroy;
+        assert reqCapacity <= subpage.elemSize;
+
+        buf.init(
+                this, handle,
+                runOffset(val) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize);
+    }
+
+    private static int parentIdx(int memoryMapIdx) {
+        return memoryMapIdx >>> 1;
+    }
+
+    private static int siblingIdx(int memoryMapIdx) {
+        return memoryMapIdx ^ 1;
+    }
+
+    private int runLength(int val) {
+        return (val >>> 2 & 0x7FFF) << pageShifts;
+    }
+
+    private int runOffset(int val) {
+        return val >>> 17 << pageShifts;
+    }
+
+    private int subpageIdx(int memoryMapIdx) {
+        return memoryMapIdx - maxSubpageAllocs;
+    }
+
+    private int nextRandom() {
+        random = random * multiplier + addend & mask;
+        return (int) (random >>> 47) & 1;
+    }
+
+    public String toString() {
+        StringBuilder buf = new StringBuilder();
+        buf.append("Chunk(");
+        buf.append(Integer.toHexString(System.identityHashCode(this)));
+        buf.append(": ");
+        buf.append(usage());
+        buf.append("%, ");
+        buf.append(chunkSize - freeBytes);
+        buf.append('/');
+        buf.append(chunkSize);
+        buf.append(')');
+        return buf.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkListL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkListL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkListL.java
new file mode 100644
index 0000000..4e0fb88
--- /dev/null
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkListL.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer;
+
+import io.netty.util.internal.StringUtil;
+
+final class PoolChunkListL<T> {
+    private final PoolArenaL<T> arena;
+    private final PoolChunkListL<T> nextList;
+    PoolChunkListL<T> prevList;
+
+    private final int minUsage;
+    private final int maxUsage;
+
+    private PoolChunkL<T> head;
+
+    // TODO: Test if adding padding helps under contention
+    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
+
+    PoolChunkListL(PoolArenaL<T> arena, PoolChunkListL<T> nextList, int minUsage, int maxUsage) {
+        this.arena = arena;
+        this.nextList = nextList;
+        this.minUsage = minUsage;
+        this.maxUsage = maxUsage;
+    }
+
+    boolean allocate(PooledByteBufL<T> buf, int reqCapacity, int normCapacity) {
+        if (head == null) {
+            return false;
+        }
+
+        for (PoolChunkL<T> cur = head;;) {
+            long handle = cur.allocate(normCapacity);
+            if (handle < 0) {
+                cur = cur.next;
+                if (cur == null) {
+                    return false;
+                }
+            } else {
+                cur.initBuf(buf, handle, reqCapacity);
+                if (cur.usage() >= maxUsage) {
+                    remove(cur);
+                    nextList.add(cur);
+                }
+                return true;
+            }
+        }
+    }
+
+    void free(PoolChunkL<T> chunk, long handle) {
+        chunk.free(handle);
+        if (chunk.usage() < minUsage) {
+            remove(chunk);
+            if (prevList == null) {
+                assert chunk.usage() == 0;
+                arena.destroyChunk(chunk);
+            } else {
+                prevList.add(chunk);
+            }
+        }
+    }
+
+    void add(PoolChunkL<T> chunk) {
+        if (chunk.usage() >= maxUsage) {
+            nextList.add(chunk);
+            return;
+        }
+
+        chunk.parent = this;
+        if (head == null) {
+            head = chunk;
+            chunk.prev = null;
+            chunk.next = null;
+        } else {
+            chunk.prev = null;
+            chunk.next = head;
+            head.prev = chunk;
+            head = chunk;
+        }
+    }
+
+    private void remove(PoolChunkL<T> cur) {
+        if (cur == head) {
+            head = cur.next;
+            if (head != null) {
+                head.prev = null;
+            }
+        } else {
+            PoolChunkL<T> next = cur.next;
+            cur.prev.next = next;
+            if (next != null) {
+                next.prev = cur.prev;
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        if (head == null) {
+            return "none";
+        }
+
+        StringBuilder buf = new StringBuilder();
+        for (PoolChunkL<T> cur = head;;) {
+            buf.append(cur);
+            cur = cur.next;
+            if (cur == null) {
+                break;
+            }
+            buf.append(StringUtil.NEWLINE);
+        }
+
+        return buf.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolSubpageL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolSubpageL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolSubpageL.java
new file mode 100644
index 0000000..96c3efe
--- /dev/null
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolSubpageL.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer;
+
+final class PoolSubpageL<T> {
+
+    final PoolChunkL<T> chunk;
+    final int memoryMapIdx;
+    final int runOffset;
+    final int pageSize;
+    final long[] bitmap;
+
+    PoolSubpageL<T> prev;
+    PoolSubpageL<T> next;
+
+    boolean doNotDestroy;
+    int elemSize;
+    int maxNumElems;
+    int nextAvail;
+    int bitmapLength;
+    int numAvail;
+
+    // TODO: Test if adding padding helps under contention
+    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
+
+    /** Special constructor that creates a linked list head */
+    PoolSubpageL(int pageSize) {
+        chunk = null;
+        memoryMapIdx = -1;
+        runOffset = -1;
+        elemSize = -1;
+        this.pageSize = pageSize;
+        bitmap = null;
+    }
+
+    PoolSubpageL(PoolChunkL<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
+        this.chunk = chunk;
+        this.memoryMapIdx = memoryMapIdx;
+        this.runOffset = runOffset;
+        this.pageSize = pageSize;
+        bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
+        init(elemSize);
+    }
+
+    void init(int elemSize) {
+        doNotDestroy = true;
+        this.elemSize = elemSize;
+        if (elemSize != 0) {
+            maxNumElems = numAvail = pageSize / elemSize;
+            nextAvail = 0;
+            bitmapLength = maxNumElems >>> 6;
+            if ((maxNumElems & 63) != 0) {
+                bitmapLength ++;
+            }
+
+            for (int i = 0; i < bitmapLength; i ++) {
+                bitmap[i] = 0;
+            }
+        }
+
+        addToPool();
+    }
+
+    /**
+     * Returns the bitmap index of the subpage allocation.
+     */
+    long allocate() {
+        if (elemSize == 0) {
+            return toHandle(0);
+        }
+
+        if (numAvail == 0 || !doNotDestroy) {
+            return -1;
+        }
+
+        final int bitmapIdx = nextAvail;
+        int q = bitmapIdx >>> 6;
+        int r = bitmapIdx & 63;
+        assert (bitmap[q] >>> r & 1) == 0;
+        bitmap[q] |= 1L << r;
+
+        if (-- numAvail == 0) {
+            removeFromPool();
+            nextAvail = -1;
+        } else {
+            nextAvail = findNextAvailable();
+        }
+
+        return toHandle(bitmapIdx);
+    }
+
+    /**
+     * @return {@code true} if this subpage is in use.
+     *         {@code false} if this subpage is not used by its chunk and thus it's OK to be released.
+     */
+    boolean free(int bitmapIdx) {
+
+        if (elemSize == 0) {
+            return true;
+        }
+
+        int q = bitmapIdx >>> 6;
+        int r = bitmapIdx & 63;
+        assert (bitmap[q] >>> r & 1) != 0;
+        bitmap[q] ^= 1L << r;
+
+        if (numAvail ++ == 0) {
+            nextAvail = bitmapIdx;
+            addToPool();
+            return true;
+        }
+
+        if (numAvail != maxNumElems) {
+            return true;
+        } else {
+            // Subpage not in use (numAvail == maxNumElems)
+            if (prev == next) {
+                // Do not remove if this subpage is the only one left in the pool.
+                return true;
+            }
+
+            // Remove this subpage from the pool if there are other subpages left in the pool.
+            doNotDestroy = false;
+            removeFromPool();
+            return false;
+        }
+    }
+
+    private void addToPool() {
+        PoolSubpageL<T> head = chunk.arena.findSubpagePoolHead(elemSize);
+        assert prev == null && next == null;
+        prev = head;
+        next = head.next;
+        next.prev = this;
+        head.next = this;
+    }
+
+    private void removeFromPool() {
+        assert prev != null && next != null;
+        prev.next = next;
+        next.prev = prev;
+        next = null;
+        prev = null;
+    }
+
+    private int findNextAvailable() {
+        int newNextAvail = -1;
+        loop:
+        for (int i = 0; i < bitmapLength; i ++) {
+            long bits = bitmap[i];
+            if (~bits != 0) {
+                for (int j = 0; j < 64; j ++) {
+                    if ((bits & 1) == 0) {
+                        newNextAvail = i << 6 | j;
+                        break loop;
+                    }
+                    bits >>>= 1;
+                }
+            }
+        }
+
+        if (newNextAvail < maxNumElems) {
+            return newNextAvail;
+        } else {
+            return -1;
+        }
+    }
+
+    private long toHandle(int bitmapIdx) {
+        return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
+    }
+
+    public String toString() {
+        if (!doNotDestroy) {
+            return "(" + memoryMapIdx + ": not in use)";
+        }
+
+        return String.valueOf('(') + memoryMapIdx + ": " + (maxNumElems - numAvail) + '/' + maxNumElems +
+               ", offset: " + runOffset + ", length: " + pageSize + ", elemSize: " + elemSize + ')';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolThreadCacheL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolThreadCacheL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolThreadCacheL.java
new file mode 100644
index 0000000..9973911
--- /dev/null
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolThreadCacheL.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer;
+
+import java.nio.ByteBuffer;
+
+final class PoolThreadCacheL {
+
+    final PoolArenaL<byte[]> heapArena;
+    final PoolArenaL<ByteBuffer> directArena;
+
+    // TODO: Test if adding padding helps under contention
+    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
+
+    PoolThreadCacheL(PoolArenaL<byte[]> heapArena, PoolArenaL<ByteBuffer> directArena) {
+        this.heapArena = heapArena;
+        this.directArena = directArena;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
new file mode 100644
index 0000000..a05b0d1
--- /dev/null
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer;
+
+import io.netty.buffer.AbstractByteBufAllocator;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.StringUtil;
+import io.netty.util.internal.SystemPropertyUtil;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PooledByteBufAllocatorL extends AbstractByteBufAllocator {
+
+    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocatorL.class);
+
+    private static final int DEFAULT_NUM_HEAP_ARENA;
+    private static final int DEFAULT_NUM_DIRECT_ARENA;
+
+    private static final int DEFAULT_PAGE_SIZE;
+    private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
+
+    private static final int MIN_PAGE_SIZE = 4096;
+    private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
+
+    static {
+        int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
+        Throwable pageSizeFallbackCause = null;
+        try {
+            validateAndCalculatePageShifts(defaultPageSize);
+        } catch (Throwable t) {
+            pageSizeFallbackCause = t;
+            defaultPageSize = 8192;
+        }
+        DEFAULT_PAGE_SIZE = defaultPageSize;
+
+        int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
+        Throwable maxOrderFallbackCause = null;
+        try {
+            validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
+        } catch (Throwable t) {
+            maxOrderFallbackCause = t;
+            defaultMaxOrder = 11;
+        }
+        DEFAULT_MAX_ORDER = defaultMaxOrder;
+
+        // Determine reasonable default for nHeapArena and nDirectArena.
+        // Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
+        final Runtime runtime = Runtime.getRuntime();
+        final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
+        DEFAULT_NUM_HEAP_ARENA = Math.max(0,
+                SystemPropertyUtil.getInt(
+                        "io.netty.allocator.numHeapArenas",
+                        (int) Math.min(
+                                runtime.availableProcessors(),
+                                Runtime.getRuntime().maxMemory() / defaultChunkSize / 2 / 3)));
+        DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
+                SystemPropertyUtil.getInt(
+                        "io.netty.allocator.numDirectArenas",
+                        (int) Math.min(
+                                runtime.availableProcessors(),
+                                PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
+            logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
+            if (pageSizeFallbackCause == null) {
+                logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
+            } else {
+                logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
+            }
+            if (maxOrderFallbackCause == null) {
+                logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
+            } else {
+                logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
+            }
+            logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
+        }
+    }
+
+    public static final PooledByteBufAllocatorL DEFAULT =
+            new PooledByteBufAllocatorL(PlatformDependent.directBufferPreferred());
+
+    private final PoolArenaL<byte[]>[] heapArenas;
+    private final PoolArenaL<ByteBuffer>[] directArenas;
+
+    final ThreadLocal<PoolThreadCacheL> threadCache = new ThreadLocal<PoolThreadCacheL>() {
+        private final AtomicInteger index = new AtomicInteger();
+        @Override
+        protected PoolThreadCacheL initialValue() {
+            final int idx = index.getAndIncrement();
+            final PoolArenaL<byte[]> heapArena;
+            final PoolArenaL<ByteBuffer> directArena;
+
+            if (heapArenas != null) {
+                heapArena = heapArenas[Math.abs(idx % heapArenas.length)];
+            } else {
+                heapArena = null;
+            }
+
+            if (directArenas != null) {
+                directArena = directArenas[Math.abs(idx % directArenas.length)];
+            } else {
+                directArena = null;
+            }
+
+            return new PoolThreadCacheL(heapArena, directArena);
+        }
+    };
+
+    public PooledByteBufAllocatorL() {
+        this(false);
+    }
+
+    public PooledByteBufAllocatorL(boolean preferDirect) {
+        this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
+    }
+
+    public PooledByteBufAllocatorL(int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
+        this(false, nHeapArena, nDirectArena, pageSize, maxOrder);
+    }
+
+    public PooledByteBufAllocatorL(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
+        super(preferDirect);
+
+        final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
+
+        if (nHeapArena < 0) {
+            throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: >= 0)");
+        }
+        if (nDirectArena < 0) {
+            throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)");
+        }
+
+        int pageShifts = validateAndCalculatePageShifts(pageSize);
+
+        if (nHeapArena > 0) {
+            heapArenas = newArenaArray(nHeapArena);
+            for (int i = 0; i < heapArenas.length; i ++) {
+                heapArenas[i] = new PoolArenaL.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
+            }
+        } else {
+            heapArenas = null;
+        }
+
+        if (nDirectArena > 0) {
+            directArenas = newArenaArray(nDirectArena);
+            for (int i = 0; i < directArenas.length; i ++) {
+                directArenas[i] = new PoolArenaL.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize);
+            }
+        } else {
+            directArenas = null;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> PoolArenaL<T>[] newArenaArray(int size) {
+        return new PoolArenaL[size];
+    }
+
+    private static int validateAndCalculatePageShifts(int pageSize) {
+        if (pageSize < MIN_PAGE_SIZE) {
+            throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: 4096+)");
+        }
+
+        // Ensure pageSize is power of 2.
+        boolean found1 = false;
+        int pageShifts = 0;
+        for (int i = pageSize; i != 0 ; i >>= 1) {
+            if ((i & 1) != 0) {
+                if (!found1) {
+                    found1 = true;
+                } else {
+                    throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2");
+                }
+            } else {
+                if (!found1) {
+                    pageShifts ++;
+                }
+            }
+        }
+        return pageShifts;
+    }
+
+    private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) {
+        if (maxOrder > 14) {
+            throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)");
+        }
+
+        // Ensure the resulting chunkSize does not overflow.
+        int chunkSize = pageSize;
+        for (int i = maxOrder; i > 0; i --) {
+            if (chunkSize > MAX_CHUNK_SIZE / 2) {
+                throw new IllegalArgumentException(String.format(
+                        "pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE));
+            }
+            chunkSize <<= 1;
+        }
+        return chunkSize;
+    }
+
+    @Override
+    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+        PoolThreadCacheL cache = threadCache.get();
+        PoolArenaL<byte[]> heapArena = cache.heapArena;
+        if (heapArena != null) {
+            return heapArena.allocate(cache, initialCapacity, maxCapacity);
+        } else {
+          throw new UnsupportedOperationException();
+        }
+    }
+
+    @Override
+    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+        PoolThreadCacheL cache = threadCache.get();
+        PoolArenaL<ByteBuffer> directArena = cache.directArena;
+        if (directArena != null) {
+            return directArena.allocate(cache, initialCapacity, maxCapacity);
+        } else {
+            if (PlatformDependent.hasUnsafe()) {
+              throw new UnsupportedOperationException();
+//                return new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
+            } else {
+              throw new UnsupportedOperationException();
+//                return new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
+            }
+        }
+    }
+
+    public String toString() {
+        StringBuilder buf = new StringBuilder();
+        buf.append(heapArenas.length);
+        buf.append(" heap arena(s):");
+        buf.append(StringUtil.NEWLINE);
+        for (PoolArenaL<byte[]> a: heapArenas) {
+            buf.append(a);
+        }
+        buf.append(directArenas.length);
+        buf.append(" direct arena(s):");
+        buf.append(StringUtil.NEWLINE);
+        for (PoolArenaL<ByteBuffer> a: directArenas) {
+            buf.append(a);
+        }
+        return buf.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
new file mode 100644
index 0000000..c25c2e9
--- /dev/null
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer;
+
+import io.netty.buffer.AbstractReferenceCountedByteBuf;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.Recycler;
+import io.netty.util.ResourceLeak;
+import io.netty.util.ResourceLeakDetector;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+abstract class PooledByteBufL<T> extends AbstractReferenceCountedByteBuf {
+
+    private final ResourceLeak leak;
+    private final Recycler.Handle recyclerHandle;
+
+    protected PoolChunkL<T> chunk;
+    protected long handle;
+    protected T memory;
+    protected int offset;
+    protected int length;
+    private int maxLength;
+
+    private ByteBuffer tmpNioBuf;
+
+    protected PooledByteBufL(Recycler.Handle recyclerHandle, int maxCapacity) {
+        super(maxCapacity);
+        leak = leakDetector.open(this);
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    void init(PoolChunkL<T> chunk, long handle, int offset, int length, int maxLength) {
+        assert handle >= 0;
+        assert chunk != null;
+
+        this.chunk = chunk;
+        this.handle = handle;
+        memory = chunk.memory;
+        this.offset = offset;
+        this.length = length;
+        this.maxLength = maxLength;
+        setIndex(0, 0);
+        tmpNioBuf = null;
+    }
+
+    void initUnpooled(PoolChunkL<T> chunk, int length) {
+        assert chunk != null;
+
+        this.chunk = chunk;
+        handle = 0;
+        memory = chunk.memory;
+        offset = 0;
+        this.length = maxLength = length;
+        setIndex(0, 0);
+        tmpNioBuf = null;
+    }
+
+    @Override
+    public final int capacity() {
+        return length;
+    }
+
+    @Override
+    public final ByteBuf capacity(int newCapacity) {
+        ensureAccessible();
+
+        // If the request capacity does not require reallocation, just update the length of the memory.
+        if (chunk.unpooled) {
+            if (newCapacity == length) {
+                return this;
+            }
+        } else {
+            if (newCapacity > length) {
+                if (newCapacity <= maxLength) {
+                    length = newCapacity;
+                    return this;
+                }
+            } else if (newCapacity < length) {
+                if (newCapacity > maxLength >>> 1) {
+                    if (maxLength <= 512) {
+                        if (newCapacity > maxLength - 16) {
+                            length = newCapacity;
+                            setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
+                            return this;
+                        }
+                    } else { // > 512 (i.e. >= 1024)
+                        length = newCapacity;
+                        setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
+                        return this;
+                    }
+                }
+            } else {
+                return this;
+            }
+        }
+
+        // Reallocation required.
+        chunk.arena.reallocate(this, newCapacity, true);
+        return this;
+    }
+
+    @Override
+    public final ByteBufAllocator alloc() {
+        return chunk.arena.parent;
+    }
+
+    @Override
+    public final ByteOrder order() {
+        return ByteOrder.BIG_ENDIAN;
+    }
+
+    @Override
+    public final ByteBuf unwrap() {
+        return null;
+    }
+
+    protected final ByteBuffer internalNioBuffer() {
+        ByteBuffer tmpNioBuf = this.tmpNioBuf;
+        if (tmpNioBuf == null) {
+            this.tmpNioBuf = tmpNioBuf = newInternalNioBuffer(memory);
+        }
+        return tmpNioBuf;
+    }
+
+    protected abstract ByteBuffer newInternalNioBuffer(T memory);
+
+    @Override
+    protected final void deallocate() {
+        if (handle >= 0) {
+            final long handle = this.handle;
+            this.handle = -1;
+            memory = null;
+            chunk.arena.free(chunk, handle);
+            if (ResourceLeakDetector.ENABLED) {
+                leak.close();
+            } else {
+                recycle();
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void recycle() {
+        Recycler.Handle recyclerHandle = this.recyclerHandle;
+        if (recyclerHandle != null) {
+            setRefCnt(1);
+            ((Recycler<Object>) recycler()).recycle(this, recyclerHandle);
+        }
+    }
+
+    protected abstract Recycler<?> recycler();
+
+    protected final int idx(int index) {
+        return offset + index;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
new file mode 100644
index 0000000..949f9fb
--- /dev/null
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
@@ -0,0 +1,342 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.netty.buffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+final class PooledUnsafeDirectByteBufL extends PooledByteBufL<ByteBuffer> {
+
+    private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
+
+    private static final Recycler<PooledUnsafeDirectByteBufL> RECYCLER = new Recycler<PooledUnsafeDirectByteBufL>() {
+        @Override
+        protected PooledUnsafeDirectByteBufL newObject(Handle handle) {
+            return new PooledUnsafeDirectByteBufL(handle, 0);
+        }
+    };
+
+    static PooledUnsafeDirectByteBufL newInstance(int maxCapacity) {
+        PooledUnsafeDirectByteBufL buf = RECYCLER.get();
+        buf.maxCapacity(maxCapacity);
+        return buf;
+    }
+
+    private long memoryAddress;
+
+    private PooledUnsafeDirectByteBufL(Recycler.Handle recyclerHandle, int maxCapacity) {
+        super(recyclerHandle, maxCapacity);
+    }
+
+    @Override
+    void init(PoolChunkL<ByteBuffer> chunk, long handle, int offset, int length, int maxLength) {
+        super.init(chunk, handle, offset, length, maxLength);
+        initMemoryAddress();
+    }
+
+    @Override
+    void initUnpooled(PoolChunkL<ByteBuffer> chunk, int length) {
+        super.initUnpooled(chunk, length);
+        initMemoryAddress();
+    }
+
+    private void initMemoryAddress() {
+        memoryAddress = PlatformDependent.directBufferAddress(memory) + offset;
+    }
+
+    @Override
+    protected ByteBuffer newInternalNioBuffer(ByteBuffer memory) {
+        return memory.duplicate();
+    }
+
+    @Override
+    public boolean isDirect() {
+        return true;
+    }
+
+    @Override
+    protected byte _getByte(int index) {
+        return PlatformDependent.getByte(addr(index));
+    }
+
+    @Override
+    protected short _getShort(int index) {
+        short v = PlatformDependent.getShort(addr(index));
+        return NATIVE_ORDER? v : Short.reverseBytes(v);
+    }
+
+    @Override
+    protected int _getUnsignedMedium(int index) {
+        long addr = addr(index);
+        return (PlatformDependent.getByte(addr) & 0xff) << 16 |
+                (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
+                PlatformDependent.getByte(addr + 2) & 0xff;
+    }
+
+    @Override
+    protected int _getInt(int index) {
+        int v = PlatformDependent.getInt(addr(index));
+        return NATIVE_ORDER? v : Integer.reverseBytes(v);
+    }
+
+    @Override
+    protected long _getLong(int index) {
+        long v = PlatformDependent.getLong(addr(index));
+        return NATIVE_ORDER? v : Long.reverseBytes(v);
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+        checkIndex(index, length);
+        if (dst == null) {
+            throw new NullPointerException("dst");
+        }
+        if (dstIndex < 0 || dstIndex > dst.capacity() - length) {
+            throw new IndexOutOfBoundsException("dstIndex: " + dstIndex);
+        }
+
+        if (length != 0) {
+            if (dst.hasMemoryAddress()) {
+                PlatformDependent.copyMemory(addr(index), dst.memoryAddress() + dstIndex, length);
+            } else if (dst.hasArray()) {
+                PlatformDependent.copyMemory(addr(index), dst.array(), dst.arrayOffset() + dstIndex, length);
+            } else {
+                dst.setBytes(dstIndex, this, index, length);
+            }
+        }
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+        checkIndex(index, length);
+        if (dst == null) {
+            throw new NullPointerException("dst");
+        }
+        if (dstIndex < 0 || dstIndex > dst.length - length) {
+            throw new IndexOutOfBoundsException("dstIndex: " + dstIndex);
+        }
+        if (length != 0) {
+            PlatformDependent.copyMemory(addr(index), dst, dstIndex, length);
+        }
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuffer dst) {
+        checkIndex(index);
+        int bytesToCopy = Math.min(capacity() - index, dst.remaining());
+        ByteBuffer tmpBuf = internalNioBuffer();
+        index = idx(index);
+        tmpBuf.clear().position(index).limit(index + bytesToCopy);
+        dst.put(tmpBuf);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+        checkIndex(index, length);
+        if (length != 0) {
+            byte[] tmp = new byte[length];
+            PlatformDependent.copyMemory(addr(index), tmp, 0, length);
+            out.write(tmp);
+        }
+        return this;
+    }
+
+    @Override
+    public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+        checkIndex(index, length);
+        if (length == 0) {
+            return 0;
+        }
+
+        ByteBuffer tmpBuf = internalNioBuffer();
+        index = idx(index);
+        tmpBuf.clear().position(index).limit(index + length);
+        return out.write(tmpBuf);
+    }
+
+    @Override
+    protected void _setByte(int index, int value) {
+        PlatformDependent.putByte(addr(index), (byte) value);
+    }
+
+    @Override
+    protected void _setShort(int index, int value) {
+        PlatformDependent.putShort(addr(index), NATIVE_ORDER ? (short) value : Short.reverseBytes((short) value));
+    }
+
+    @Override
+    protected void _setMedium(int index, int value) {
+        long addr = addr(index);
+        PlatformDependent.putByte(addr, (byte) (value >>> 16));
+        PlatformDependent.putByte(addr + 1, (byte) (value >>> 8));
+        PlatformDependent.putByte(addr + 2, (byte) value);
+    }
+
+    @Override
+    protected void _setInt(int index, int value) {
+        PlatformDependent.putInt(addr(index), NATIVE_ORDER ? value : Integer.reverseBytes(value));
+    }
+
+    @Override
+    protected void _setLong(int index, long value) {
+        PlatformDependent.putLong(addr(index), NATIVE_ORDER ? value : Long.reverseBytes(value));
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+        checkIndex(index, length);
+        if (src == null) {
+            throw new NullPointerException("src");
+        }
+        if (srcIndex < 0 || srcIndex > src.capacity() - length) {
+            throw new IndexOutOfBoundsException("srcIndex: " + srcIndex);
+        }
+
+        if (length != 0) {
+            if (src.hasMemoryAddress()) {
+                PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, addr(index), length);
+            } else if (src.hasArray()) {
+                PlatformDependent.copyMemory(src.array(), src.arrayOffset() + srcIndex, addr(index), length);
+            } else {
+                src.getBytes(srcIndex, this, index, length);
+            }
+        }
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+        checkIndex(index, length);
+        if (length != 0) {
+            PlatformDependent.copyMemory(src, srcIndex, addr(index), length);
+        }
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuffer src) {
+        checkIndex(index);
+        ByteBuffer tmpBuf = internalNioBuffer();
+        if (src == tmpBuf) {
+            src = src.duplicate();
+        }
+
+        index = idx(index);
+        tmpBuf.clear().position(index).limit(index + src.remaining());
+        tmpBuf.put(src);
+        return this;
+    }
+
+    @Override
+    public int setBytes(int index, InputStream in, int length) throws IOException {
+        checkIndex(index, length);
+        byte[] tmp = new byte[length];
+        int readBytes = in.read(tmp);
+        if (readBytes > 0) {
+            PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes);
+        }
+        return readBytes;
+    }
+
+    @Override
+    public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+        checkIndex(index, length);
+        ByteBuffer tmpNioBuf = internalNioBuffer();
+        index = idx(index);
+        tmpNioBuf.clear().position(index).limit(index + length);
+        try {
+            return in.read(tmpNioBuf);
+        } catch (ClosedChannelException e) {
+            return -1;
+        }
+    }
+
+    @Override
+    public ByteBuf copy(int index, int length) {
+        checkIndex(index, length);
+        PooledUnsafeDirectByteBufL copy = (PooledUnsafeDirectByteBufL) alloc().directBuffer(length, maxCapacity());
+        if (length != 0) {
+            PlatformDependent.copyMemory(addr(index), copy.addr(0), length);
+            copy.setIndex(0, length);
+        }
+        return copy;
+    }
+
+    @Override
+    public int nioBufferCount() {
+        return 1;
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers(int index, int length) {
+        return new ByteBuffer[] { nioBuffer(index, length) };
+    }
+
+    @Override
+    public ByteBuffer internalNioBuffer(int index, int length) {
+        checkIndex(index, length);
+        index = idx(index);
+        return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
+    }
+
+    @Override
+    public boolean hasArray() {
+        return false;
+    }
+
+    @Override
+    public byte[] array() {
+        throw new UnsupportedOperationException("direct buffer");
+    }
+
+    @Override
+    public int arrayOffset() {
+        throw new UnsupportedOperationException("direct buffer");
+    }
+
+    @Override
+    public boolean hasMemoryAddress() {
+        return true;
+    }
+
+    @Override
+    public long memoryAddress() {
+        return memoryAddress;
+    }
+
+    private long addr(int index) {
+        return memoryAddress + index;
+    }
+
+    @Override
+    protected Recycler<?> recycler() {
+        return RECYCLER;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index 63bf4ea..15d489d 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -59,6 +59,11 @@
       <version>1.0-SNAPSHOT</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <version>4.0.3.Final</version>
+      <artifactId>netty-bufferl</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.drill</groupId>
       <artifactId>common</artifactId>
       <version>1.0-SNAPSHOT</version>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
index 6ffa968..147762e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
@@ -118,12 +118,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     }
   }
   
-  public void copyValue(int inIndex, int outIndex, ${minor.class}Vector v){
+  public void copyFrom(int inIndex, int outIndex, ${minor.class}Vector v){
     <#if (type.width > 8)>
     data.getBytes(inIndex * ${type.width}, v.data, outIndex * ${type.width}, ${type.width});
     <#else> <#-- type.width <= 8 -->
     data.set${(minor.javaType!type.javaType)?cap_first}(outIndex * ${type.width}, 
-        data.get${(minor.javaType!type.javaType)?cap_first}(inIndex * ${type.width})
+        v.data.get${(minor.javaType!type.javaType)?cap_first}(inIndex * ${type.width})
     );
     </#if> <#-- type.width -->
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
index e2387a7..644019e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
@@ -199,9 +199,9 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
   }
 
   
-  public void copyValue(int inIndex, int outIndex, Nullable${minor.class}Vector v){
-    bits.copyValue(inIndex, outIndex, v.bits);
-    values.copyValue(inIndex, outIndex, v.values);
+  public void copyFrom(int inIndex, int outIndex, Nullable${minor.class}Vector v){
+    bits.copyFrom(inIndex, outIndex, v.bits);
+    values.copyFrom(inIndex, outIndex, v.values);
   }
   
   public final class Accessor implements ValueVector.Accessor{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
index 99b24de..26564c1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
@@ -91,7 +91,7 @@ import com.google.common.collect.Lists;
     }
   }
   
-  public void copyValue(int inIndex, int outIndex, Repeated${minor.class}Vector v){
+  public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v){
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
index 40363a7..95965f7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
@@ -119,7 +119,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     clear();
   }
   
-  public void copyValue(int inIndex, int outIndex, ${minor.class}Vector v){
+  public void copyFrom(int inIndex, int outIndex, ${minor.class}Vector v){
     int start = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(inIndex);
     int end =   offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(inIndex+1);
     int len = end - start;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index b2503c1..1b49d54 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -18,12 +18,11 @@
 package org.apache.drill.exec.client;
 
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.Iterables.get;
 import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
 import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorL;
 import io.netty.channel.nio.NioEventLoopGroup;
 
 import java.io.Closeable;
@@ -57,8 +56,9 @@ public class DrillClient implements Closeable{
   
   DrillConfig config;
   private UserClient client;
-  private ClusterCoordinator clusterCoordinator;
-
+  private volatile ClusterCoordinator clusterCoordinator;
+  private volatile boolean connected = false;
+  
   public DrillClient() {
     this(DrillConfig.create());
   }
@@ -82,25 +82,33 @@ public class DrillClient implements Closeable{
    *
    * @throws IOException
    */
-  public void connect() throws Exception {
+  public synchronized void connect() throws RpcException {
+    if(connected) return;
+    
     if(clusterCoordinator == null){
-      this.clusterCoordinator = new ZKClusterCoordinator(this.config);
-      this.clusterCoordinator.start(10000);
+      try {
+        this.clusterCoordinator = new ZKClusterCoordinator(this.config);
+        this.clusterCoordinator.start(10000);
+      } catch (Exception e) {
+        throw new RpcException("Failure setting up ZK for client.", e);
+      }
+      
     }
     
     Collection<DrillbitEndpoint> endpoints = clusterCoordinator.getAvailableEndpoints();
     checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
     // just use the first endpoint for now
     DrillbitEndpoint endpoint = endpoints.iterator().next();
-    ByteBufAllocator bb = new PooledByteBufAllocator(true);
+    ByteBufAllocator bb = new PooledByteBufAllocatorL(true);
     this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
     try {
       logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
       FutureHandler f = new FutureHandler();
       this.client.connect(f, endpoint);
       f.checkedGet();
+      connected = true;
     } catch (InterruptedException e) {
-      throw new IOException(e);
+      throw new RpcException(e);
     }
   }
 
@@ -111,6 +119,7 @@ public class DrillClient implements Closeable{
    */
   public void close() throws IOException {
     this.client.close();
+    connected = false;
   }
 
   /**
@@ -124,7 +133,17 @@ public class DrillClient implements Closeable{
     ListHoldingResultsListener listener = new ListHoldingResultsListener();
     client.submitQuery(listener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
     return listener.getResults();
-
+  }
+  
+  /**
+   * Submits a Logical plan for direct execution (bypasses parsing)
+   *
+   * @param plan the plan to execute
+   * @return a handle for the query result
+   * @throws RpcException
+   */
+  public void runQuery(QueryType type, String plan, UserResultsListener resultsListener){
+    client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
   }
   
   private class ListHoldingResultsListener implements UserResultsListener {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index 5130f2b..8454bb7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -22,9 +22,9 @@ public class FunctionImplementationRegistry {
       FunctionHolder holder = converter.getHolder(clazz);
       if(holder != null){
         methods.put(holder.getFunctionName(), holder);
-        logger.debug("Registering function {}", holder);
+//        logger.debug("Registering function {}", holder);
       }else{
-        logger.debug("Unable to initialize function for class {}", clazz.getName());
+        logger.warn("Unable to initialize function for class {}", clazz.getName());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java
index 027dae6..499a693 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java
@@ -19,20 +19,21 @@ package org.apache.drill.exec.memory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorL;
 
-public class DirectBufferAllocator extends BufferAllocator{
+public class DirectBufferAllocator extends BufferAllocator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectBufferAllocator.class);
 
-  private final PooledByteBufAllocator buffer = new PooledByteBufAllocator(true);
-  
+  private final PooledByteBufAllocatorL buffer = new PooledByteBufAllocatorL(true);
+
+  public DirectBufferAllocator() {
+  }
+
   @Override
   public ByteBuf buffer(int size) {
     // TODO: wrap it
     return buffer.directBuffer(size);
   }
-  
-  
 
   @Override
   protected boolean pre(int bytes) {
@@ -40,8 +41,6 @@ public class DirectBufferAllocator extends BufferAllocator{
     return true;
   }
 
-
-
   @Override
   public long getAllocatedMemory() {
     return 0;
@@ -52,11 +51,9 @@ public class DirectBufferAllocator extends BufferAllocator{
     return buffer;
   }
 
-  
-
   @Override
   public BufferAllocator getChildAllocator(long initialReservation, long maximumReservation) {
-    //TODO: Add child account allocator.
+    // TODO: Add child account allocator.
     return this;
   }
 
@@ -64,5 +61,5 @@ public class DirectBufferAllocator extends BufferAllocator{
   public void close() {
     // TODO: collect all buffers and release them away using a weak hashmap so we don't impact pool work
   }
-  
+
 }


Re: [13/13] git commit: Merge Jason's SQL updates to work with full exec. Random vector updates including changing to copyFrom Fixes to writable batch. Add an alternative ByteBuf implementation that leverages little endianness.

Posted by Tanujit Ghosh <ta...@gmail.com>.
One small issue in the build

[tanujit@legolas prototype]$ git diff
diff --git
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
b/sandbox/
index 4aa4dae..d17a594 100644
---
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
+++
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
@@ -338,7 +338,7 @@ public class TestComparisonFunctions {
         }};

         PhysicalPlanReader reader = new PhysicalPlanReader(c,
c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance())
-        PhysicalPlan plan =
reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/Float8GreaterThan.json"),
C
+        PhysicalPlan plan =
reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/functions/float8GreaterThan.json"),
C
         FunctionImplementationRegistry registry = new
FunctionImplementationRegistry(c);
         FragmentContext context = new FragmentContext(bitContext,
ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, re
         SimpleRootExec exec = new
SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot)
plan.getSortedOperators(false).iterator()


On Fri, Aug 2, 2013 at 8:33 AM, <ja...@apache.org> wrote:

> Merge Jason's SQL updates to work with full exec.
> Random vector updates including changing to copyFrom
> Fixes to writable batch.
> Add an alternative ByteBuf implementation that leverages little endianness.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/103072a6
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/103072a6
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/103072a6
>
> Branch: refs/heads/master
> Commit: 103072a619741d5e228fdb181501ec2f82e111a3
> Parents: 4e289f0
> Author: Jacques Nadeau <ja...@apache.org>
> Authored: Wed Jul 31 20:00:14 2013 -0700
> Committer: Jacques Nadeau <ja...@apache.org>
> Committed: Thu Aug 1 19:51:35 2013 -0700
>
> ----------------------------------------------------------------------
>  sandbox/prototype/exec/bufferl/pom.xml          |  35 ++
>  .../main/java/io/netty/buffer/PoolArenaL.java   | 425 +++++++++++++++++++
>  .../main/java/io/netty/buffer/PoolChunkL.java   | 348 +++++++++++++++
>  .../java/io/netty/buffer/PoolChunkListL.java    | 129 ++++++
>  .../main/java/io/netty/buffer/PoolSubpageL.java | 195 +++++++++
>  .../java/io/netty/buffer/PoolThreadCacheL.java  |  33 ++
>  .../netty/buffer/PooledByteBufAllocatorL.java   | 263 ++++++++++++
>  .../java/io/netty/buffer/PooledByteBufL.java    | 173 ++++++++
>  .../buffer/PooledUnsafeDirectByteBufL.java      | 342 +++++++++++++++
>  sandbox/prototype/exec/java-exec/pom.xml        |   5 +
>  .../templates/FixedValueVectors.java            |   4 +-
>  .../templates/NullableValueVectors.java         |   6 +-
>  .../templates/RepeatedValueVectors.java         |   2 +-
>  .../templates/VariableLengthVectors.java        |   2 +-
>  .../apache/drill/exec/client/DrillClient.java   |  39 +-
>  .../expr/fn/FunctionImplementationRegistry.java |   4 +-
>  .../exec/memory/DirectBufferAllocator.java      |  21 +-
>  .../apache/drill/exec/opt/BasicOptimizer.java   |  47 +-
>  .../physical/impl/filter/FilterTemplate.java    |  12 +-
>  .../materialize/VectorRecordMaterializer.java   |   1 +
>  .../impl/project/ProjectorTemplate.java         |   4 +-
>  .../physical/impl/svremover/CopierTemplate.java |   2 +-
>  .../impl/svremover/RemovingRecordBatch.java     |  12 +-
>  .../apache/drill/exec/record/WritableBatch.java |   5 +
>  .../exec/record/selection/SelectionVector2.java |   8 +-
>  .../apache/drill/exec/rpc/user/UserClient.java  |   8 +-
>  .../org/apache/drill/exec/vector/BitVector.java |   4 +-
>  .../apache/drill/exec/work/foreman/Foreman.java |   3 +
>  .../apache/drill/exec/memory/TestEndianess.java |  24 ++
>  sandbox/prototype/exec/pom.xml                  |   1 +
>  sandbox/prototype/sqlparser/pom.xml             |  23 +-
>  .../java/org/apache/drill/jdbc/DrillTable.java  |  82 +++-
>  .../org/apache/drill/optiq/EnumerableDrill.java | 250 -----------
>  .../drill/optiq/EnumerableDrillFullEngine.java  |  85 ++++
>  .../apache/drill/optiq/EnumerableDrillRel.java  |  20 +-
>  .../apache/drill/optiq/EnumerableDrillRule.java |  18 +-
>  .../drill/sql/client/full/BatchListener.java    |  48 +++
>  .../drill/sql/client/full/BatchLoaderMap.java   | 185 ++++++++
>  .../drill/sql/client/full/DrillFullImpl.java    |  64 +++
>  .../drill/sql/client/full/ResultEnumerator.java |  48 +++
>  .../drill/sql/client/ref/DrillRefImpl.java      | 240 +++++++++++
>  .../org/apache/drill/jdbc/test/JdbcAssert.java  |  49 ++-
>  .../org/apache/drill/jdbc/test/JdbcTest.java    | 168 +++++---
>  43 files changed, 3024 insertions(+), 413 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/pom.xml
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/bufferl/pom.xml
> b/sandbox/prototype/exec/bufferl/pom.xml
> new file mode 100644
> index 0000000..baa2c3d
> --- /dev/null
> +++ b/sandbox/prototype/exec/bufferl/pom.xml
> @@ -0,0 +1,35 @@
> +<?xml version="1.0" encoding="UTF-8"?>
> +<!--
> +  ~ Copyright 2012 The Netty Project
> +  ~
> +  ~ The Netty Project licenses this file to you under the Apache License,
> +  ~ version 2.0 (the "License"); you may not use this file except in
> compliance
> +  ~ with the License. You may obtain a copy of the License at:
> +  ~
> +  ~   http://www.apache.org/licenses/LICENSE-2.0
> +  ~
> +  ~ Unless required by applicable law or agreed to in writing, software
> +  ~ distributed under the License is distributed on an "AS IS" BASIS,
> WITHOUT
> +  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
> the
> +  ~ License for the specific language governing permissions and
> limitations
> +  ~ under the License.
> +  -->
> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
> http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd
> ">
> +
> +  <modelVersion>4.0.0</modelVersion>
> +
> +  <groupId>org.apache.drill.exec</groupId>
> +  <version>4.0.3.Final</version>
> +  <artifactId>netty-bufferl</artifactId>
> +
> +  <name>Netty/Drill/Buffer</name>
> +
> +  <dependencies>
> +    <dependency>
> +      <groupId>io.netty</groupId>
> +      <artifactId>netty-buffer</artifactId>
> +      <version>${project.version}</version>
> +    </dependency>
> +  </dependencies>
> +
> +</project>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
> new file mode 100644
> index 0000000..db9818d
> --- /dev/null
> +++
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
> @@ -0,0 +1,425 @@
> +/*
> + * Copyright 2012 The Netty Project
> + *
> + * The Netty Project licenses this file to you under the Apache License,
> + * version 2.0 (the "License"); you may not use this file except in
> compliance
> + * with the License. You may obtain a copy of the License at:
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> WITHOUT
> + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
> the
> + * License for the specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package io.netty.buffer;
> +
> +import io.netty.util.internal.PlatformDependent;
> +import io.netty.util.internal.StringUtil;
> +
> +import java.nio.ByteBuffer;
> +
> +abstract class PoolArenaL<T> {
> +
> +    final PooledByteBufAllocatorL parent;
> +
> +    private final int pageSize;
> +    private final int maxOrder;
> +    private final int pageShifts;
> +    private final int chunkSize;
> +    private final int subpageOverflowMask;
> +
> +    private final PoolSubpageL<T>[] tinySubpagePools;
> +    private final PoolSubpageL<T>[] smallSubpagePools;
> +
> +    private final PoolChunkListL<T> q050;
> +    private final PoolChunkListL<T> q025;
> +    private final PoolChunkListL<T> q000;
> +    private final PoolChunkListL<T> qInit;
> +    private final PoolChunkListL<T> q075;
> +    private final PoolChunkListL<T> q100;
> +
> +    // TODO: Test if adding padding helps under contention
> +    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
> +
> +    protected PoolArenaL(PooledByteBufAllocatorL parent, int pageSize,
> int maxOrder, int pageShifts, int chunkSize) {
> +        this.parent = parent;
> +        this.pageSize = pageSize;
> +        this.maxOrder = maxOrder;
> +        this.pageShifts = pageShifts;
> +        this.chunkSize = chunkSize;
> +        subpageOverflowMask = ~(pageSize - 1);
> +
> +        tinySubpagePools = newSubpagePoolArray(512 >>> 4);
> +        for (int i = 0; i < tinySubpagePools.length; i ++) {
> +            tinySubpagePools[i] = newSubpagePoolHead(pageSize);
> +        }
> +
> +        smallSubpagePools = newSubpagePoolArray(pageShifts - 9);
> +        for (int i = 0; i < smallSubpagePools.length; i ++) {
> +            smallSubpagePools[i] = newSubpagePoolHead(pageSize);
> +        }
> +
> +        q100 = new PoolChunkListL<T>(this, null, 100, Integer.MAX_VALUE);
> +        q075 = new PoolChunkListL<T>(this, q100, 75, 100);
> +        q050 = new PoolChunkListL<T>(this, q075, 50, 100);
> +        q025 = new PoolChunkListL<T>(this, q050, 25, 75);
> +        q000 = new PoolChunkListL<T>(this, q025, 1, 50);
> +        qInit = new PoolChunkListL<T>(this, q000, Integer.MIN_VALUE, 25);
> +
> +        q100.prevList = q075;
> +        q075.prevList = q050;
> +        q050.prevList = q025;
> +        q025.prevList = q000;
> +        q000.prevList = null;
> +        qInit.prevList = qInit;
> +    }
> +
> +    private PoolSubpageL<T> newSubpagePoolHead(int pageSize) {
> +        PoolSubpageL<T> head = new PoolSubpageL<T>(pageSize);
> +        head.prev = head;
> +        head.next = head;
> +        return head;
> +    }
> +
> +    @SuppressWarnings("unchecked")
> +    private PoolSubpageL<T>[] newSubpagePoolArray(int size) {
> +        return new PoolSubpageL[size];
> +    }
> +
> +    PooledByteBufL<T> allocate(PoolThreadCacheL cache, int reqCapacity,
> int maxCapacity) {
> +        PooledByteBufL<T> buf = newByteBuf(maxCapacity);
> +        allocate(cache, buf, reqCapacity);
> +        return buf;
> +    }
> +
> +    private void allocate(PoolThreadCacheL cache, PooledByteBufL<T> buf,
> final int reqCapacity) {
> +        final int normCapacity = normalizeCapacity(reqCapacity);
> +        if ((normCapacity & subpageOverflowMask) == 0) { // capacity <
> pageSize
> +            int tableIdx;
> +            PoolSubpageL<T>[] table;
> +            if ((normCapacity & 0xFFFFFE00) == 0) { // < 512
> +                tableIdx = normCapacity >>> 4;
> +                table = tinySubpagePools;
> +            } else {
> +                tableIdx = 0;
> +                int i = normCapacity >>> 10;
> +                while (i != 0) {
> +                    i >>>= 1;
> +                    tableIdx ++;
> +                }
> +                table = smallSubpagePools;
> +            }
> +
> +            synchronized (this) {
> +                final PoolSubpageL<T> head = table[tableIdx];
> +                final PoolSubpageL<T> s = head.next;
> +                if (s != head) {
> +                    assert s.doNotDestroy && s.elemSize == normCapacity;
> +                    long handle = s.allocate();
> +                    assert handle >= 0;
> +                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
> +                    return;
> +                }
> +            }
> +        } else if (normCapacity > chunkSize) {
> +            allocateHuge(buf, reqCapacity);
> +            return;
> +        }
> +
> +        allocateNormal(buf, reqCapacity, normCapacity);
> +    }
> +
> +    private synchronized void allocateNormal(PooledByteBufL<T> buf, int
> reqCapacity, int normCapacity) {
> +        if (q050.allocate(buf, reqCapacity, normCapacity) ||
> q025.allocate(buf, reqCapacity, normCapacity) ||
> +            q000.allocate(buf, reqCapacity, normCapacity) ||
> qInit.allocate(buf, reqCapacity, normCapacity) ||
> +            q075.allocate(buf, reqCapacity, normCapacity) ||
> q100.allocate(buf, reqCapacity, normCapacity)) {
> +            return;
> +        }
> +
> +        // Add a new chunk.
> +        PoolChunkL<T> c = newChunk(pageSize, maxOrder, pageShifts,
> chunkSize);
> +        long handle = c.allocate(normCapacity);
> +        assert handle > 0;
> +        c.initBuf(buf, handle, reqCapacity);
> +        qInit.add(c);
> +    }
> +
> +    private void allocateHuge(PooledByteBufL<T> buf, int reqCapacity) {
> +        buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
> +    }
> +
> +    synchronized void free(PoolChunkL<T> chunk, long handle) {
> +        if (chunk.unpooled) {
> +            destroyChunk(chunk);
> +        } else {
> +            chunk.parent.free(chunk, handle);
> +        }
> +    }
> +
> +    PoolSubpageL<T> findSubpagePoolHead(int elemSize) {
> +        int tableIdx;
> +        PoolSubpageL<T>[] table;
> +        if ((elemSize & 0xFFFFFE00) == 0) { // < 512
> +            tableIdx = elemSize >>> 4;
> +            table = tinySubpagePools;
> +        } else {
> +            tableIdx = 0;
> +            elemSize >>>= 10;
> +            while (elemSize != 0) {
> +                elemSize >>>= 1;
> +                tableIdx ++;
> +            }
> +            table = smallSubpagePools;
> +        }
> +
> +        return table[tableIdx];
> +    }
> +
> +    private int normalizeCapacity(int reqCapacity) {
> +        if (reqCapacity < 0) {
> +            throw new IllegalArgumentException("capacity: " + reqCapacity
> + " (expected: 0+)");
> +        }
> +        if (reqCapacity >= chunkSize) {
> +            return reqCapacity;
> +        }
> +
> +        if ((reqCapacity & 0xFFFFFE00) != 0) { // >= 512
> +            // Doubled
> +
> +            int normalizedCapacity = reqCapacity;
> +            normalizedCapacity |= normalizedCapacity >>>  1;
> +            normalizedCapacity |= normalizedCapacity >>>  2;
> +            normalizedCapacity |= normalizedCapacity >>>  4;
> +            normalizedCapacity |= normalizedCapacity >>>  8;
> +            normalizedCapacity |= normalizedCapacity >>> 16;
> +            normalizedCapacity ++;
> +
> +            if (normalizedCapacity < 0) {
> +                normalizedCapacity >>>= 1;
> +            }
> +
> +            return normalizedCapacity;
> +        }
> +
> +        // Quantum-spaced
> +        if ((reqCapacity & 15) == 0) {
> +            return reqCapacity;
> +        }
> +
> +        return (reqCapacity & ~15) + 16;
> +    }
> +
> +    void reallocate(PooledByteBufL<T> buf, int newCapacity, boolean
> freeOldMemory) {
> +        if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
> +            throw new IllegalArgumentException("newCapacity: " +
> newCapacity);
> +        }
> +
> +        int oldCapacity = buf.length;
> +        if (oldCapacity == newCapacity) {
> +            return;
> +        }
> +
> +        PoolChunkL<T> oldChunk = buf.chunk;
> +        long oldHandle = buf.handle;
> +        T oldMemory = buf.memory;
> +        int oldOffset = buf.offset;
> +
> +        int readerIndex = buf.readerIndex();
> +        int writerIndex = buf.writerIndex();
> +
> +        allocate(parent.threadCache.get(), buf, newCapacity);
> +        if (newCapacity > oldCapacity) {
> +            memoryCopy(
> +                    oldMemory, oldOffset + readerIndex,
> +                    buf.memory, buf.offset + readerIndex, writerIndex -
> readerIndex);
> +        } else if (newCapacity < oldCapacity) {
> +            if (readerIndex < newCapacity) {
> +                if (writerIndex > newCapacity) {
> +                    writerIndex = newCapacity;
> +                }
> +                memoryCopy(
> +                        oldMemory, oldOffset + readerIndex,
> +                        buf.memory, buf.offset + readerIndex, writerIndex
> - readerIndex);
> +            } else {
> +                readerIndex = writerIndex = newCapacity;
> +            }
> +        }
> +
> +        buf.setIndex(readerIndex, writerIndex);
> +
> +        if (freeOldMemory) {
> +            free(oldChunk, oldHandle);
> +        }
> +    }
> +
> +    protected abstract PoolChunkL<T> newChunk(int pageSize, int maxOrder,
> int pageShifts, int chunkSize);
> +    protected abstract PoolChunkL<T> newUnpooledChunk(int capacity);
> +    protected abstract PooledByteBufL<T> newByteBuf(int maxCapacity);
> +    protected abstract void memoryCopy(T src, int srcOffset, T dst, int
> dstOffset, int length);
> +    protected abstract void destroyChunk(PoolChunkL<T> chunk);
> +
> +    public synchronized String toString() {
> +        StringBuilder buf = new StringBuilder();
> +        buf.append("Chunk(s) at 0~25%:");
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append(qInit);
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append("Chunk(s) at 0~50%:");
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append(q000);
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append("Chunk(s) at 25~75%:");
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append(q025);
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append("Chunk(s) at 50~100%:");
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append(q050);
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append("Chunk(s) at 75~100%:");
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append(q075);
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append("Chunk(s) at 100%:");
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append(q100);
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append("tiny subpages:");
> +        for (int i = 1; i < tinySubpagePools.length; i ++) {
> +            PoolSubpageL<T> head = tinySubpagePools[i];
> +            if (head.next == head) {
> +                continue;
> +            }
> +
> +            buf.append(StringUtil.NEWLINE);
> +            buf.append(i);
> +            buf.append(": ");
> +            PoolSubpageL<T> s = head.next;
> +            for (;;) {
> +                buf.append(s);
> +                s = s.next;
> +                if (s == head) {
> +                    break;
> +                }
> +            }
> +        }
> +        buf.append(StringUtil.NEWLINE);
> +        buf.append("small subpages:");
> +        for (int i = 1; i < smallSubpagePools.length; i ++) {
> +            PoolSubpageL<T> head = smallSubpagePools[i];
> +            if (head.next == head) {
> +                continue;
> +            }
> +
> +            buf.append(StringUtil.NEWLINE);
> +            buf.append(i);
> +            buf.append(": ");
> +            PoolSubpageL<T> s = head.next;
> +            for (;;) {
> +                buf.append(s);
> +                s = s.next;
> +                if (s == head) {
> +                    break;
> +                }
> +            }
> +        }
> +        buf.append(StringUtil.NEWLINE);
> +
> +        return buf.toString();
> +    }
> +
> +    static final class HeapArena extends PoolArenaL<byte[]> {
> +
> +        HeapArena(PooledByteBufAllocatorL parent, int pageSize, int
> maxOrder, int pageShifts, int chunkSize) {
> +            super(parent, pageSize, maxOrder, pageShifts, chunkSize);
> +        }
> +
> +        @Override
> +        protected PoolChunkL<byte[]> newChunk(int pageSize, int maxOrder,
> int pageShifts, int chunkSize) {
> +            return new PoolChunkL<byte[]>(this, new byte[chunkSize],
> pageSize, maxOrder, pageShifts, chunkSize);
> +        }
> +
> +        @Override
> +        protected PoolChunkL<byte[]> newUnpooledChunk(int capacity) {
> +            return new PoolChunkL<byte[]>(this, new byte[capacity],
> capacity);
> +        }
> +
> +        @Override
> +        protected void destroyChunk(PoolChunkL<byte[]> chunk) {
> +            // Rely on GC.
> +        }
> +
> +        @Override
> +        protected PooledByteBufL<byte[]> newByteBuf(int maxCapacity) {
> +          throw new UnsupportedOperationException();
> +//            return PooledHeapByteBufL.newInstance(maxCapacity);
> +        }
> +
> +        @Override
> +        protected void memoryCopy(byte[] src, int srcOffset, byte[] dst,
> int dstOffset, int length) {
> +            if (length == 0) {
> +                return;
> +            }
> +
> +            System.arraycopy(src, srcOffset, dst, dstOffset, length);
> +        }
> +    }
> +
> +    static final class DirectArena extends PoolArenaL<ByteBuffer> {
> +
> +        private static final boolean HAS_UNSAFE =
> PlatformDependent.hasUnsafe();
> +
> +        DirectArena(PooledByteBufAllocatorL parent, int pageSize, int
> maxOrder, int pageShifts, int chunkSize) {
> +            super(parent, pageSize, maxOrder, pageShifts, chunkSize);
> +        }
> +
> +        @Override
> +        protected PoolChunkL<ByteBuffer> newChunk(int pageSize, int
> maxOrder, int pageShifts, int chunkSize) {
> +            return new PoolChunkL<ByteBuffer>(
> +                    this, ByteBuffer.allocateDirect(chunkSize), pageSize,
> maxOrder, pageShifts, chunkSize);
> +        }
> +
> +        @Override
> +        protected PoolChunkL<ByteBuffer> newUnpooledChunk(int capacity) {
> +            return new PoolChunkL<ByteBuffer>(this,
> ByteBuffer.allocateDirect(capacity), capacity);
> +        }
> +
> +        @Override
> +        protected void destroyChunk(PoolChunkL<ByteBuffer> chunk) {
> +            PlatformDependent.freeDirectBuffer(chunk.memory);
> +        }
> +
> +        @Override
> +        protected PooledByteBufL<ByteBuffer> newByteBuf(int maxCapacity) {
> +            if (HAS_UNSAFE) {
> +                return
> PooledUnsafeDirectByteBufL.newInstance(maxCapacity);
> +            } else {
> +              throw new UnsupportedOperationException();
> +//                return PooledDirectByteBufL.newInstance(maxCapacity);
> +            }
> +        }
> +
> +        @Override
> +        protected void memoryCopy(ByteBuffer src, int srcOffset,
> ByteBuffer dst, int dstOffset, int length) {
> +            if (length == 0) {
> +                return;
> +            }
> +
> +            if (HAS_UNSAFE) {
> +                PlatformDependent.copyMemory(
> +                        PlatformDependent.directBufferAddress(src) +
> srcOffset,
> +                        PlatformDependent.directBufferAddress(dst) +
> dstOffset, length);
> +            } else {
> +                // We must duplicate the NIO buffers because they may be
> accessed by other Netty buffers.
> +                src = src.duplicate();
> +                dst = dst.duplicate();
> +                src.position(srcOffset).limit(srcOffset + length);
> +                dst.position(dstOffset);
> +                dst.put(src);
> +            }
> +        }
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkL.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkL.java
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkL.java
> new file mode 100644
> index 0000000..8e8f32b
> --- /dev/null
> +++
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkL.java
> @@ -0,0 +1,348 @@
> +/*
> + * Copyright 2012 The Netty Project
> + *
> + * The Netty Project licenses this file to you under the Apache License,
> + * version 2.0 (the "License"); you may not use this file except in
> compliance
> + * with the License. You may obtain a copy of the License at:
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> WITHOUT
> + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
> the
> + * License for the specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package io.netty.buffer;
> +
> +final class PoolChunkL<T> {
> +    private static final int ST_UNUSED = 0;
> +    private static final int ST_BRANCH = 1;
> +    private static final int ST_ALLOCATED = 2;
> +    private static final int ST_ALLOCATED_SUBPAGE = ST_ALLOCATED | 1;
> +
> +    private static final long multiplier = 0x5DEECE66DL;
> +    private static final long addend = 0xBL;
> +    private static final long mask = (1L << 48) - 1;
> +
> +    final PoolArenaL<T> arena;
> +    final T memory;
> +    final boolean unpooled;
> +
> +    private final int[] memoryMap;
> +    private final PoolSubpageL<T>[] subpages;
> +    /** Used to determine if the requested capacity is equal to or
> greater than pageSize. */
> +    private final int subpageOverflowMask;
> +    private final int pageSize;
> +    private final int pageShifts;
> +
> +    private final int chunkSize;
> +    private final int maxSubpageAllocs;
> +
> +    private long random = (System.nanoTime() ^ multiplier) & mask;
> +
> +    private int freeBytes;
> +
> +    PoolChunkListL<T> parent;
> +    PoolChunkL<T> prev;
> +    PoolChunkL<T> next;
> +
> +    // TODO: Test if adding padding helps under contention
> +    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
> +
> +    PoolChunkL(PoolArenaL<T> arena, T memory, int pageSize, int maxOrder,
> int pageShifts, int chunkSize) {
> +        unpooled = false;
> +        this.arena = arena;
> +        this.memory = memory;
> +        this.pageSize = pageSize;
> +        this.pageShifts = pageShifts;
> +        this.chunkSize = chunkSize;
> +        subpageOverflowMask = ~(pageSize - 1);
> +        freeBytes = chunkSize;
> +
> +        int chunkSizeInPages = chunkSize >>> pageShifts;
> +        maxSubpageAllocs = 1 << maxOrder;
> +
> +        // Generate the memory map.
> +        memoryMap = new int[maxSubpageAllocs << 1];
> +        int memoryMapIndex = 1;
> +        for (int i = 0; i <= maxOrder; i ++) {
> +            int runSizeInPages = chunkSizeInPages >>> i;
> +            for (int j = 0; j < chunkSizeInPages; j += runSizeInPages) {
> +                //noinspection PointlessBitwiseExpression
> +                memoryMap[memoryMapIndex ++] = j << 17 | runSizeInPages
> << 2 | ST_UNUSED;
> +            }
> +        }
> +
> +        subpages = newSubpageArray(maxSubpageAllocs);
> +    }
> +
> +    /** Creates a special chunk that is not pooled. */
> +    PoolChunkL(PoolArenaL<T> arena, T memory, int size) {
> +        unpooled = true;
> +        this.arena = arena;
> +        this.memory = memory;
> +        memoryMap = null;
> +        subpages = null;
> +        subpageOverflowMask = 0;
> +        pageSize = 0;
> +        pageShifts = 0;
> +        chunkSize = size;
> +        maxSubpageAllocs = 0;
> +    }
> +
> +    @SuppressWarnings("unchecked")
> +    private PoolSubpageL<T>[] newSubpageArray(int size) {
> +        return new PoolSubpageL[size];
> +    }
> +
> +    int usage() {
> +        if (freeBytes == 0) {
> +            return 100;
> +        }
> +
> +        int freePercentage = (int) (freeBytes * 100L / chunkSize);
> +        if (freePercentage == 0) {
> +            return 99;
> +        }
> +        return 100 - freePercentage;
> +    }
> +
> +    long allocate(int normCapacity) {
> +        int firstVal = memoryMap[1];
> +        if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
> +            return allocateRun(normCapacity, 1, firstVal);
> +        } else {
> +            return allocateSubpage(normCapacity, 1, firstVal);
> +        }
> +    }
> +
> +    private long allocateRun(int normCapacity, int curIdx, int val) {
> +        for (;;) {
> +            if ((val & ST_ALLOCATED) != 0) { // state == ST_ALLOCATED ||
> state == ST_ALLOCATED_SUBPAGE
> +                return -1;
> +            }
> +
> +            if ((val & ST_BRANCH) != 0) { // state == ST_BRANCH
> +                int nextIdx = curIdx << 1 ^ nextRandom();
> +                long res = allocateRun(normCapacity, nextIdx,
> memoryMap[nextIdx]);
> +                if (res > 0) {
> +                    return res;
> +                }
> +
> +                curIdx = nextIdx ^ 1;
> +                val = memoryMap[curIdx];
> +                continue;
> +            }
> +
> +            // state == ST_UNUSED
> +            return allocateRunSimple(normCapacity, curIdx, val);
> +        }
> +    }
> +
> +    private long allocateRunSimple(int normCapacity, int curIdx, int val)
> {
> +        int runLength = runLength(val);
> +        if (normCapacity > runLength) {
> +            return -1;
> +        }
> +
> +        for (;;) {
> +            if (normCapacity == runLength) {
> +                // Found the run that fits.
> +                // Note that capacity has been normalized already, so we
> don't need to deal with
> +                // the values that are not power of 2.
> +                memoryMap[curIdx] = val & ~3 | ST_ALLOCATED;
> +                freeBytes -= runLength;
> +                return curIdx;
> +            }
> +
> +            int nextIdx = curIdx << 1 ^ nextRandom();
> +            int unusedIdx = nextIdx ^ 1;
> +
> +            memoryMap[curIdx] = val & ~3 | ST_BRANCH;
> +            //noinspection PointlessBitwiseExpression
> +            memoryMap[unusedIdx] = memoryMap[unusedIdx] & ~3 | ST_UNUSED;
> +
> +            runLength >>>= 1;
> +            curIdx = nextIdx;
> +            val = memoryMap[curIdx];
> +        }
> +    }
> +
> +    private long allocateSubpage(int normCapacity, int curIdx, int val) {
> +        int state = val & 3;
> +        if (state == ST_BRANCH) {
> +            int nextIdx = curIdx << 1 ^ nextRandom();
> +            long res = branchSubpage(normCapacity, nextIdx);
> +            if (res > 0) {
> +                return res;
> +            }
> +
> +            return branchSubpage(normCapacity, nextIdx ^ 1);
> +        }
> +
> +        if (state == ST_UNUSED) {
> +            return allocateSubpageSimple(normCapacity, curIdx, val);
> +        }
> +
> +        if (state == ST_ALLOCATED_SUBPAGE) {
> +            PoolSubpageL<T> subpage = subpages[subpageIdx(curIdx)];
> +            int elemSize = subpage.elemSize;
> +            if (normCapacity != elemSize) {
> +                return -1;
> +            }
> +
> +            return subpage.allocate();
> +        }
> +
> +        return -1;
> +    }
> +
> +    private long allocateSubpageSimple(int normCapacity, int curIdx, int
> val) {
> +        int runLength = runLength(val);
> +        for (;;) {
> +            if (runLength == pageSize) {
> +                memoryMap[curIdx] = val & ~3 | ST_ALLOCATED_SUBPAGE;
> +                freeBytes -= runLength;
> +
> +                int subpageIdx = subpageIdx(curIdx);
> +                PoolSubpageL<T> subpage = subpages[subpageIdx];
> +                if (subpage == null) {
> +                    subpage = new PoolSubpageL<T>(this, curIdx,
> runOffset(val), pageSize, normCapacity);
> +                    subpages[subpageIdx] = subpage;
> +                } else {
> +                    subpage.init(normCapacity);
> +                }
> +                return subpage.allocate();
> +            }
> +
> +            int nextIdx = curIdx << 1 ^ nextRandom();
> +            int unusedIdx = nextIdx ^ 1;
> +
> +            memoryMap[curIdx] = val & ~3 | ST_BRANCH;
> +            //noinspection PointlessBitwiseExpression
> +            memoryMap[unusedIdx] = memoryMap[unusedIdx] & ~3 | ST_UNUSED;
> +
> +            runLength >>>= 1;
> +            curIdx = nextIdx;
> +            val = memoryMap[curIdx];
> +        }
> +    }
> +
> +    private long branchSubpage(int normCapacity, int nextIdx) {
> +        int nextVal = memoryMap[nextIdx];
> +        if ((nextVal & 3) != ST_ALLOCATED) {
> +            return allocateSubpage(normCapacity, nextIdx, nextVal);
> +        }
> +        return -1;
> +    }
> +
> +    void free(long handle) {
> +        int memoryMapIdx = (int) handle;
> +        int bitmapIdx = (int) (handle >>> 32);
> +
> +        int val = memoryMap[memoryMapIdx];
> +        int state = val & 3;
> +        if (state == ST_ALLOCATED_SUBPAGE) {
> +            assert bitmapIdx != 0;
> +            PoolSubpageL<T> subpage = subpages[subpageIdx(memoryMapIdx)];
> +            assert subpage != null && subpage.doNotDestroy;
> +            if (subpage.free(bitmapIdx & 0x3FFFFFFF)) {
> +                return;
> +            }
> +        } else {
> +            assert state == ST_ALLOCATED : "state: " + state;
> +            assert bitmapIdx == 0;
> +        }
> +
> +        freeBytes += runLength(val);
> +
> +        for (;;) {
> +            //noinspection PointlessBitwiseExpression
> +            memoryMap[memoryMapIdx] = val & ~3 | ST_UNUSED;
> +            if (memoryMapIdx == 1) {
> +                assert freeBytes == chunkSize;
> +                return;
> +            }
> +
> +            if ((memoryMap[siblingIdx(memoryMapIdx)] & 3) != ST_UNUSED) {
> +                break;
> +            }
> +
> +            memoryMapIdx = parentIdx(memoryMapIdx);
> +            val = memoryMap[memoryMapIdx];
> +        }
> +    }
> +
> +    void initBuf(PooledByteBufL<T> buf, long handle, int reqCapacity) {
> +        int memoryMapIdx = (int) handle;
> +        int bitmapIdx = (int) (handle >>> 32);
> +        if (bitmapIdx == 0) {
> +            int val = memoryMap[memoryMapIdx];
> +            assert (val & 3) == ST_ALLOCATED : String.valueOf(val & 3);
> +            buf.init(this, handle, runOffset(val), reqCapacity,
> runLength(val));
> +        } else {
> +            initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
> +        }
> +    }
> +
> +    void initBufWithSubpage(PooledByteBufL<T> buf, long handle, int
> reqCapacity) {
> +        initBufWithSubpage(buf, handle, (int) (handle >>> 32),
> reqCapacity);
> +    }
> +
> +    private void initBufWithSubpage(PooledByteBufL<T> buf, long handle,
> int bitmapIdx, int reqCapacity) {
> +        assert bitmapIdx != 0;
> +
> +        int memoryMapIdx = (int) handle;
> +        int val = memoryMap[memoryMapIdx];
> +        assert (val & 3) == ST_ALLOCATED_SUBPAGE;
> +
> +        PoolSubpageL<T> subpage = subpages[subpageIdx(memoryMapIdx)];
> +        assert subpage.doNotDestroy;
> +        assert reqCapacity <= subpage.elemSize;
> +
> +        buf.init(
> +                this, handle,
> +                runOffset(val) + (bitmapIdx & 0x3FFFFFFF) *
> subpage.elemSize, reqCapacity, subpage.elemSize);
> +    }
> +
> +    private static int parentIdx(int memoryMapIdx) {
> +        return memoryMapIdx >>> 1;
> +    }
> +
> +    private static int siblingIdx(int memoryMapIdx) {
> +        return memoryMapIdx ^ 1;
> +    }
> +
> +    private int runLength(int val) {
> +        return (val >>> 2 & 0x7FFF) << pageShifts;
> +    }
> +
> +    private int runOffset(int val) {
> +        return val >>> 17 << pageShifts;
> +    }
> +
> +    private int subpageIdx(int memoryMapIdx) {
> +        return memoryMapIdx - maxSubpageAllocs;
> +    }
> +
> +    private int nextRandom() {
> +        random = random * multiplier + addend & mask;
> +        return (int) (random >>> 47) & 1;
> +    }
> +
> +    public String toString() {
> +        StringBuilder buf = new StringBuilder();
> +        buf.append("Chunk(");
> +        buf.append(Integer.toHexString(System.identityHashCode(this)));
> +        buf.append(": ");
> +        buf.append(usage());
> +        buf.append("%, ");
> +        buf.append(chunkSize - freeBytes);
> +        buf.append('/');
> +        buf.append(chunkSize);
> +        buf.append(')');
> +        return buf.toString();
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkListL.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkListL.java
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkListL.java
> new file mode 100644
> index 0000000..4e0fb88
> --- /dev/null
> +++
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolChunkListL.java
> @@ -0,0 +1,129 @@
> +/*
> + * Copyright 2012 The Netty Project
> + *
> + * The Netty Project licenses this file to you under the Apache License,
> + * version 2.0 (the "License"); you may not use this file except in
> compliance
> + * with the License. You may obtain a copy of the License at:
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> WITHOUT
> + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
> the
> + * License for the specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package io.netty.buffer;
> +
> +import io.netty.util.internal.StringUtil;
> +
> +final class PoolChunkListL<T> {
> +    private final PoolArenaL<T> arena;
> +    private final PoolChunkListL<T> nextList;
> +    PoolChunkListL<T> prevList;
> +
> +    private final int minUsage;
> +    private final int maxUsage;
> +
> +    private PoolChunkL<T> head;
> +
> +    // TODO: Test if adding padding helps under contention
> +    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
> +
> +    PoolChunkListL(PoolArenaL<T> arena, PoolChunkListL<T> nextList, int
> minUsage, int maxUsage) {
> +        this.arena = arena;
> +        this.nextList = nextList;
> +        this.minUsage = minUsage;
> +        this.maxUsage = maxUsage;
> +    }
> +
> +    boolean allocate(PooledByteBufL<T> buf, int reqCapacity, int
> normCapacity) {
> +        if (head == null) {
> +            return false;
> +        }
> +
> +        for (PoolChunkL<T> cur = head;;) {
> +            long handle = cur.allocate(normCapacity);
> +            if (handle < 0) {
> +                cur = cur.next;
> +                if (cur == null) {
> +                    return false;
> +                }
> +            } else {
> +                cur.initBuf(buf, handle, reqCapacity);
> +                if (cur.usage() >= maxUsage) {
> +                    remove(cur);
> +                    nextList.add(cur);
> +                }
> +                return true;
> +            }
> +        }
> +    }
> +
> +    void free(PoolChunkL<T> chunk, long handle) {
> +        chunk.free(handle);
> +        if (chunk.usage() < minUsage) {
> +            remove(chunk);
> +            if (prevList == null) {
> +                assert chunk.usage() == 0;
> +                arena.destroyChunk(chunk);
> +            } else {
> +                prevList.add(chunk);
> +            }
> +        }
> +    }
> +
> +    void add(PoolChunkL<T> chunk) {
> +        if (chunk.usage() >= maxUsage) {
> +            nextList.add(chunk);
> +            return;
> +        }
> +
> +        chunk.parent = this;
> +        if (head == null) {
> +            head = chunk;
> +            chunk.prev = null;
> +            chunk.next = null;
> +        } else {
> +            chunk.prev = null;
> +            chunk.next = head;
> +            head.prev = chunk;
> +            head = chunk;
> +        }
> +    }
> +
> +    private void remove(PoolChunkL<T> cur) {
> +        if (cur == head) {
> +            head = cur.next;
> +            if (head != null) {
> +                head.prev = null;
> +            }
> +        } else {
> +            PoolChunkL<T> next = cur.next;
> +            cur.prev.next = next;
> +            if (next != null) {
> +                next.prev = cur.prev;
> +            }
> +        }
> +    }
> +
> +    @Override
> +    public String toString() {
> +        if (head == null) {
> +            return "none";
> +        }
> +
> +        StringBuilder buf = new StringBuilder();
> +        for (PoolChunkL<T> cur = head;;) {
> +            buf.append(cur);
> +            cur = cur.next;
> +            if (cur == null) {
> +                break;
> +            }
> +            buf.append(StringUtil.NEWLINE);
> +        }
> +
> +        return buf.toString();
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolSubpageL.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolSubpageL.java
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolSubpageL.java
> new file mode 100644
> index 0000000..96c3efe
> --- /dev/null
> +++
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolSubpageL.java
> @@ -0,0 +1,195 @@
> +/*
> + * Copyright 2012 The Netty Project
> + *
> + * The Netty Project licenses this file to you under the Apache License,
> + * version 2.0 (the "License"); you may not use this file except in
> compliance
> + * with the License. You may obtain a copy of the License at:
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> WITHOUT
> + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
> the
> + * License for the specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package io.netty.buffer;
> +
> +final class PoolSubpageL<T> {
> +
> +    final PoolChunkL<T> chunk;
> +    final int memoryMapIdx;
> +    final int runOffset;
> +    final int pageSize;
> +    final long[] bitmap;
> +
> +    PoolSubpageL<T> prev;
> +    PoolSubpageL<T> next;
> +
> +    boolean doNotDestroy;
> +    int elemSize;
> +    int maxNumElems;
> +    int nextAvail;
> +    int bitmapLength;
> +    int numAvail;
> +
> +    // TODO: Test if adding padding helps under contention
> +    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
> +
> +    /** Special constructor that creates a linked list head */
> +    PoolSubpageL(int pageSize) {
> +        chunk = null;
> +        memoryMapIdx = -1;
> +        runOffset = -1;
> +        elemSize = -1;
> +        this.pageSize = pageSize;
> +        bitmap = null;
> +    }
> +
> +    PoolSubpageL(PoolChunkL<T> chunk, int memoryMapIdx, int runOffset,
> int pageSize, int elemSize) {
> +        this.chunk = chunk;
> +        this.memoryMapIdx = memoryMapIdx;
> +        this.runOffset = runOffset;
> +        this.pageSize = pageSize;
> +        bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
> +        init(elemSize);
> +    }
> +
> +    void init(int elemSize) {
> +        doNotDestroy = true;
> +        this.elemSize = elemSize;
> +        if (elemSize != 0) {
> +            maxNumElems = numAvail = pageSize / elemSize;
> +            nextAvail = 0;
> +            bitmapLength = maxNumElems >>> 6;
> +            if ((maxNumElems & 63) != 0) {
> +                bitmapLength ++;
> +            }
> +
> +            for (int i = 0; i < bitmapLength; i ++) {
> +                bitmap[i] = 0;
> +            }
> +        }
> +
> +        addToPool();
> +    }
> +
> +    /**
> +     * Returns the bitmap index of the subpage allocation.
> +     */
> +    long allocate() {
> +        if (elemSize == 0) {
> +            return toHandle(0);
> +        }
> +
> +        if (numAvail == 0 || !doNotDestroy) {
> +            return -1;
> +        }
> +
> +        final int bitmapIdx = nextAvail;
> +        int q = bitmapIdx >>> 6;
> +        int r = bitmapIdx & 63;
> +        assert (bitmap[q] >>> r & 1) == 0;
> +        bitmap[q] |= 1L << r;
> +
> +        if (-- numAvail == 0) {
> +            removeFromPool();
> +            nextAvail = -1;
> +        } else {
> +            nextAvail = findNextAvailable();
> +        }
> +
> +        return toHandle(bitmapIdx);
> +    }
> +
> +    /**
> +     * @return {@code true} if this subpage is in use.
> +     *         {@code false} if this subpage is not used by its chunk and
> thus it's OK to be released.
> +     */
> +    boolean free(int bitmapIdx) {
> +
> +        if (elemSize == 0) {
> +            return true;
> +        }
> +
> +        int q = bitmapIdx >>> 6;
> +        int r = bitmapIdx & 63;
> +        assert (bitmap[q] >>> r & 1) != 0;
> +        bitmap[q] ^= 1L << r;
> +
> +        if (numAvail ++ == 0) {
> +            nextAvail = bitmapIdx;
> +            addToPool();
> +            return true;
> +        }
> +
> +        if (numAvail != maxNumElems) {
> +            return true;
> +        } else {
> +            // Subpage not in use (numAvail == maxNumElems)
> +            if (prev == next) {
> +                // Do not remove if this subpage is the only one left in
> the pool.
> +                return true;
> +            }
> +
> +            // Remove this subpage from the pool if there are other
> subpages left in the pool.
> +            doNotDestroy = false;
> +            removeFromPool();
> +            return false;
> +        }
> +    }
> +
> +    private void addToPool() {
> +        PoolSubpageL<T> head = chunk.arena.findSubpagePoolHead(elemSize);
> +        assert prev == null && next == null;
> +        prev = head;
> +        next = head.next;
> +        next.prev = this;
> +        head.next = this;
> +    }
> +
> +    private void removeFromPool() {
> +        assert prev != null && next != null;
> +        prev.next = next;
> +        next.prev = prev;
> +        next = null;
> +        prev = null;
> +    }
> +
> +    private int findNextAvailable() {
> +        int newNextAvail = -1;
> +        loop:
> +        for (int i = 0; i < bitmapLength; i ++) {
> +            long bits = bitmap[i];
> +            if (~bits != 0) {
> +                for (int j = 0; j < 64; j ++) {
> +                    if ((bits & 1) == 0) {
> +                        newNextAvail = i << 6 | j;
> +                        break loop;
> +                    }
> +                    bits >>>= 1;
> +                }
> +            }
> +        }
> +
> +        if (newNextAvail < maxNumElems) {
> +            return newNextAvail;
> +        } else {
> +            return -1;
> +        }
> +    }
> +
> +    private long toHandle(int bitmapIdx) {
> +        return 0x4000000000000000L | (long) bitmapIdx << 32 |
> memoryMapIdx;
> +    }
> +
> +    public String toString() {
> +        if (!doNotDestroy) {
> +            return "(" + memoryMapIdx + ": not in use)";
> +        }
> +
> +        return String.valueOf('(') + memoryMapIdx + ": " + (maxNumElems -
> numAvail) + '/' + maxNumElems +
> +               ", offset: " + runOffset + ", length: " + pageSize + ",
> elemSize: " + elemSize + ')';
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolThreadCacheL.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolThreadCacheL.java
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolThreadCacheL.java
> new file mode 100644
> index 0000000..9973911
> --- /dev/null
> +++
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolThreadCacheL.java
> @@ -0,0 +1,33 @@
> +/*
> + * Copyright 2012 The Netty Project
> + *
> + * The Netty Project licenses this file to you under the Apache License,
> + * version 2.0 (the "License"); you may not use this file except in
> compliance
> + * with the License. You may obtain a copy of the License at:
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> WITHOUT
> + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
> the
> + * License for the specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package io.netty.buffer;
> +
> +import java.nio.ByteBuffer;
> +
> +final class PoolThreadCacheL {
> +
> +    final PoolArenaL<byte[]> heapArena;
> +    final PoolArenaL<ByteBuffer> directArena;
> +
> +    // TODO: Test if adding padding helps under contention
> +    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
> +
> +    PoolThreadCacheL(PoolArenaL<byte[]> heapArena, PoolArenaL<ByteBuffer>
> directArena) {
> +        this.heapArena = heapArena;
> +        this.directArena = directArena;
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
> new file mode 100644
> index 0000000..a05b0d1
> --- /dev/null
> +++
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
> @@ -0,0 +1,263 @@
> +/*
> + * Copyright 2012 The Netty Project
> + *
> + * The Netty Project licenses this file to you under the Apache License,
> + * version 2.0 (the "License"); you may not use this file except in
> compliance
> + * with the License. You may obtain a copy of the License at:
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> WITHOUT
> + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
> the
> + * License for the specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package io.netty.buffer;
> +
> +import io.netty.buffer.AbstractByteBufAllocator;
> +import io.netty.buffer.ByteBuf;
> +import io.netty.util.internal.PlatformDependent;
> +import io.netty.util.internal.StringUtil;
> +import io.netty.util.internal.SystemPropertyUtil;
> +import io.netty.util.internal.logging.InternalLogger;
> +import io.netty.util.internal.logging.InternalLoggerFactory;
> +
> +import java.nio.ByteBuffer;
> +import java.util.concurrent.atomic.AtomicInteger;
> +
> +public class PooledByteBufAllocatorL extends AbstractByteBufAllocator {
> +
> +    private static final InternalLogger logger =
> InternalLoggerFactory.getInstance(PooledByteBufAllocatorL.class);
> +
> +    private static final int DEFAULT_NUM_HEAP_ARENA;
> +    private static final int DEFAULT_NUM_DIRECT_ARENA;
> +
> +    private static final int DEFAULT_PAGE_SIZE;
> +    private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB
> per chunk
> +
> +    private static final int MIN_PAGE_SIZE = 4096;
> +    private static final int MAX_CHUNK_SIZE = (int) (((long)
> Integer.MAX_VALUE + 1) / 2);
> +
> +    static {
> +        int defaultPageSize =
> SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
> +        Throwable pageSizeFallbackCause = null;
> +        try {
> +            validateAndCalculatePageShifts(defaultPageSize);
> +        } catch (Throwable t) {
> +            pageSizeFallbackCause = t;
> +            defaultPageSize = 8192;
> +        }
> +        DEFAULT_PAGE_SIZE = defaultPageSize;
> +
> +        int defaultMaxOrder =
> SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
> +        Throwable maxOrderFallbackCause = null;
> +        try {
> +            validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE,
> defaultMaxOrder);
> +        } catch (Throwable t) {
> +            maxOrderFallbackCause = t;
> +            defaultMaxOrder = 11;
> +        }
> +        DEFAULT_MAX_ORDER = defaultMaxOrder;
> +
> +        // Determine reasonable default for nHeapArena and nDirectArena.
> +        // Assuming each arena has 3 chunks, the pool should not consume
> more than 50% of max memory.
> +        final Runtime runtime = Runtime.getRuntime();
> +        final int defaultChunkSize = DEFAULT_PAGE_SIZE <<
> DEFAULT_MAX_ORDER;
> +        DEFAULT_NUM_HEAP_ARENA = Math.max(0,
> +                SystemPropertyUtil.getInt(
> +                        "io.netty.allocator.numHeapArenas",
> +                        (int) Math.min(
> +                                runtime.availableProcessors(),
> +                                Runtime.getRuntime().maxMemory() /
> defaultChunkSize / 2 / 3)));
> +        DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
> +                SystemPropertyUtil.getInt(
> +                        "io.netty.allocator.numDirectArenas",
> +                        (int) Math.min(
> +                                runtime.availableProcessors(),
> +                                PlatformDependent.maxDirectMemory() /
> defaultChunkSize / 2 / 3)));
> +
> +        if (logger.isDebugEnabled()) {
> +            logger.debug("-Dio.netty.allocator.numHeapArenas: {}",
> DEFAULT_NUM_HEAP_ARENA);
> +            logger.debug("-Dio.netty.allocator.numDirectArenas: {}",
> DEFAULT_NUM_DIRECT_ARENA);
> +            if (pageSizeFallbackCause == null) {
> +                logger.debug("-Dio.netty.allocator.pageSize: {}",
> DEFAULT_PAGE_SIZE);
> +            } else {
> +                logger.debug("-Dio.netty.allocator.pageSize: {}",
> DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
> +            }
> +            if (maxOrderFallbackCause == null) {
> +                logger.debug("-Dio.netty.allocator.maxOrder: {}",
> DEFAULT_MAX_ORDER);
> +            } else {
> +                logger.debug("-Dio.netty.allocator.maxOrder: {}",
> DEFAULT_MAX_ORDER, maxOrderFallbackCause);
> +            }
> +            logger.debug("-Dio.netty.allocator.chunkSize: {}",
> DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
> +        }
> +    }
> +
> +    public static final PooledByteBufAllocatorL DEFAULT =
> +            new
> PooledByteBufAllocatorL(PlatformDependent.directBufferPreferred());
> +
> +    private final PoolArenaL<byte[]>[] heapArenas;
> +    private final PoolArenaL<ByteBuffer>[] directArenas;
> +
> +    final ThreadLocal<PoolThreadCacheL> threadCache = new
> ThreadLocal<PoolThreadCacheL>() {
> +        private final AtomicInteger index = new AtomicInteger();
> +        @Override
> +        protected PoolThreadCacheL initialValue() {
> +            final int idx = index.getAndIncrement();
> +            final PoolArenaL<byte[]> heapArena;
> +            final PoolArenaL<ByteBuffer> directArena;
> +
> +            if (heapArenas != null) {
> +                heapArena = heapArenas[Math.abs(idx % heapArenas.length)];
> +            } else {
> +                heapArena = null;
> +            }
> +
> +            if (directArenas != null) {
> +                directArena = directArenas[Math.abs(idx %
> directArenas.length)];
> +            } else {
> +                directArena = null;
> +            }
> +
> +            return new PoolThreadCacheL(heapArena, directArena);
> +        }
> +    };
> +
> +    public PooledByteBufAllocatorL() {
> +        this(false);
> +    }
> +
> +    public PooledByteBufAllocatorL(boolean preferDirect) {
> +        this(preferDirect, DEFAULT_NUM_HEAP_ARENA,
> DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
> +    }
> +
> +    public PooledByteBufAllocatorL(int nHeapArena, int nDirectArena, int
> pageSize, int maxOrder) {
> +        this(false, nHeapArena, nDirectArena, pageSize, maxOrder);
> +    }
> +
> +    public PooledByteBufAllocatorL(boolean preferDirect, int nHeapArena,
> int nDirectArena, int pageSize, int maxOrder) {
> +        super(preferDirect);
> +
> +        final int chunkSize = validateAndCalculateChunkSize(pageSize,
> maxOrder);
> +
> +        if (nHeapArena < 0) {
> +            throw new IllegalArgumentException("nHeapArena: " +
> nHeapArena + " (expected: >= 0)");
> +        }
> +        if (nDirectArena < 0) {
> +            throw new IllegalArgumentException("nDirectArea: " +
> nDirectArena + " (expected: >= 0)");
> +        }
> +
> +        int pageShifts = validateAndCalculatePageShifts(pageSize);
> +
> +        if (nHeapArena > 0) {
> +            heapArenas = newArenaArray(nHeapArena);
> +            for (int i = 0; i < heapArenas.length; i ++) {
> +                heapArenas[i] = new PoolArenaL.HeapArena(this, pageSize,
> maxOrder, pageShifts, chunkSize);
> +            }
> +        } else {
> +            heapArenas = null;
> +        }
> +
> +        if (nDirectArena > 0) {
> +            directArenas = newArenaArray(nDirectArena);
> +            for (int i = 0; i < directArenas.length; i ++) {
> +                directArenas[i] = new PoolArenaL.DirectArena(this,
> pageSize, maxOrder, pageShifts, chunkSize);
> +            }
> +        } else {
> +            directArenas = null;
> +        }
> +    }
> +
> +    @SuppressWarnings("unchecked")
> +    private static <T> PoolArenaL<T>[] newArenaArray(int size) {
> +        return new PoolArenaL[size];
> +    }
> +
> +    private static int validateAndCalculatePageShifts(int pageSize) {
> +        if (pageSize < MIN_PAGE_SIZE) {
> +            throw new IllegalArgumentException("pageSize: " + pageSize +
> " (expected: 4096+)");
> +        }
> +
> +        // Ensure pageSize is power of 2.
> +        boolean found1 = false;
> +        int pageShifts = 0;
> +        for (int i = pageSize; i != 0 ; i >>= 1) {
> +            if ((i & 1) != 0) {
> +                if (!found1) {
> +                    found1 = true;
> +                } else {
> +                    throw new IllegalArgumentException("pageSize: " +
> pageSize + " (expected: power of 2");
> +                }
> +            } else {
> +                if (!found1) {
> +                    pageShifts ++;
> +                }
> +            }
> +        }
> +        return pageShifts;
> +    }
> +
> +    private static int validateAndCalculateChunkSize(int pageSize, int
> maxOrder) {
> +        if (maxOrder > 14) {
> +            throw new IllegalArgumentException("maxOrder: " + maxOrder +
> " (expected: 0-14)");
> +        }
> +
> +        // Ensure the resulting chunkSize does not overflow.
> +        int chunkSize = pageSize;
> +        for (int i = maxOrder; i > 0; i --) {
> +            if (chunkSize > MAX_CHUNK_SIZE / 2) {
> +                throw new IllegalArgumentException(String.format(
> +                        "pageSize (%d) << maxOrder (%d) must not exceed
> %d", pageSize, maxOrder, MAX_CHUNK_SIZE));
> +            }
> +            chunkSize <<= 1;
> +        }
> +        return chunkSize;
> +    }
> +
> +    @Override
> +    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
> {
> +        PoolThreadCacheL cache = threadCache.get();
> +        PoolArenaL<byte[]> heapArena = cache.heapArena;
> +        if (heapArena != null) {
> +            return heapArena.allocate(cache, initialCapacity,
> maxCapacity);
> +        } else {
> +          throw new UnsupportedOperationException();
> +        }
> +    }
> +
> +    @Override
> +    protected ByteBuf newDirectBuffer(int initialCapacity, int
> maxCapacity) {
> +        PoolThreadCacheL cache = threadCache.get();
> +        PoolArenaL<ByteBuffer> directArena = cache.directArena;
> +        if (directArena != null) {
> +            return directArena.allocate(cache, initialCapacity,
> maxCapacity);
> +        } else {
> +            if (PlatformDependent.hasUnsafe()) {
> +              throw new UnsupportedOperationException();
> +//                return new UnpooledUnsafeDirectByteBuf(this,
> initialCapacity, maxCapacity);
> +            } else {
> +              throw new UnsupportedOperationException();
> +//                return new UnpooledDirectByteBuf(this, initialCapacity,
> maxCapacity);
> +            }
> +        }
> +    }
> +
> +    public String toString() {
> +        StringBuilder buf = new StringBuilder();
> +        buf.append(heapArenas.length);
> +        buf.append(" heap arena(s):");
> +        buf.append(StringUtil.NEWLINE);
> +        for (PoolArenaL<byte[]> a: heapArenas) {
> +            buf.append(a);
> +        }
> +        buf.append(directArenas.length);
> +        buf.append(" direct arena(s):");
> +        buf.append(StringUtil.NEWLINE);
> +        for (PoolArenaL<ByteBuffer> a: directArenas) {
> +            buf.append(a);
> +        }
> +        return buf.toString();
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
> new file mode 100644
> index 0000000..c25c2e9
> --- /dev/null
> +++
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
> @@ -0,0 +1,173 @@
> +/*
> + * Copyright 2012 The Netty Project
> + *
> + * The Netty Project licenses this file to you under the Apache License,
> + * version 2.0 (the "License"); you may not use this file except in
> compliance
> + * with the License. You may obtain a copy of the License at:
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> WITHOUT
> + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
> the
> + * License for the specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package io.netty.buffer;
> +
> +import io.netty.buffer.AbstractReferenceCountedByteBuf;
> +import io.netty.buffer.ByteBuf;
> +import io.netty.buffer.ByteBufAllocator;
> +import io.netty.util.Recycler;
> +import io.netty.util.ResourceLeak;
> +import io.netty.util.ResourceLeakDetector;
> +
> +import java.nio.ByteBuffer;
> +import java.nio.ByteOrder;
> +
> +abstract class PooledByteBufL<T> extends AbstractReferenceCountedByteBuf {
> +
> +    private final ResourceLeak leak;
> +    private final Recycler.Handle recyclerHandle;
> +
> +    protected PoolChunkL<T> chunk;
> +    protected long handle;
> +    protected T memory;
> +    protected int offset;
> +    protected int length;
> +    private int maxLength;
> +
> +    private ByteBuffer tmpNioBuf;
> +
> +    protected PooledByteBufL(Recycler.Handle recyclerHandle, int
> maxCapacity) {
> +        super(maxCapacity);
> +        leak = leakDetector.open(this);
> +        this.recyclerHandle = recyclerHandle;
> +    }
> +
> +    void init(PoolChunkL<T> chunk, long handle, int offset, int length,
> int maxLength) {
> +        assert handle >= 0;
> +        assert chunk != null;
> +
> +        this.chunk = chunk;
> +        this.handle = handle;
> +        memory = chunk.memory;
> +        this.offset = offset;
> +        this.length = length;
> +        this.maxLength = maxLength;
> +        setIndex(0, 0);
> +        tmpNioBuf = null;
> +    }
> +
> +    void initUnpooled(PoolChunkL<T> chunk, int length) {
> +        assert chunk != null;
> +
> +        this.chunk = chunk;
> +        handle = 0;
> +        memory = chunk.memory;
> +        offset = 0;
> +        this.length = maxLength = length;
> +        setIndex(0, 0);
> +        tmpNioBuf = null;
> +    }
> +
> +    @Override
> +    public final int capacity() {
> +        return length;
> +    }
> +
> +    @Override
> +    public final ByteBuf capacity(int newCapacity) {
> +        ensureAccessible();
> +
> +        // If the request capacity does not require reallocation, just
> update the length of the memory.
> +        if (chunk.unpooled) {
> +            if (newCapacity == length) {
> +                return this;
> +            }
> +        } else {
> +            if (newCapacity > length) {
> +                if (newCapacity <= maxLength) {
> +                    length = newCapacity;
> +                    return this;
> +                }
> +            } else if (newCapacity < length) {
> +                if (newCapacity > maxLength >>> 1) {
> +                    if (maxLength <= 512) {
> +                        if (newCapacity > maxLength - 16) {
> +                            length = newCapacity;
> +                            setIndex(Math.min(readerIndex(),
> newCapacity), Math.min(writerIndex(), newCapacity));
> +                            return this;
> +                        }
> +                    } else { // > 512 (i.e. >= 1024)
> +                        length = newCapacity;
> +                        setIndex(Math.min(readerIndex(), newCapacity),
> Math.min(writerIndex(), newCapacity));
> +                        return this;
> +                    }
> +                }
> +            } else {
> +                return this;
> +            }
> +        }
> +
> +        // Reallocation required.
> +        chunk.arena.reallocate(this, newCapacity, true);
> +        return this;
> +    }
> +
> +    @Override
> +    public final ByteBufAllocator alloc() {
> +        return chunk.arena.parent;
> +    }
> +
> +    @Override
> +    public final ByteOrder order() {
> +        return ByteOrder.BIG_ENDIAN;
> +    }
> +
> +    @Override
> +    public final ByteBuf unwrap() {
> +        return null;
> +    }
> +
> +    protected final ByteBuffer internalNioBuffer() {
> +        ByteBuffer tmpNioBuf = this.tmpNioBuf;
> +        if (tmpNioBuf == null) {
> +            this.tmpNioBuf = tmpNioBuf = newInternalNioBuffer(memory);
> +        }
> +        return tmpNioBuf;
> +    }
> +
> +    protected abstract ByteBuffer newInternalNioBuffer(T memory);
> +
> +    @Override
> +    protected final void deallocate() {
> +        if (handle >= 0) {
> +            final long handle = this.handle;
> +            this.handle = -1;
> +            memory = null;
> +            chunk.arena.free(chunk, handle);
> +            if (ResourceLeakDetector.ENABLED) {
> +                leak.close();
> +            } else {
> +                recycle();
> +            }
> +        }
> +    }
> +
> +    @SuppressWarnings("unchecked")
> +    private void recycle() {
> +        Recycler.Handle recyclerHandle = this.recyclerHandle;
> +        if (recyclerHandle != null) {
> +            setRefCnt(1);
> +            ((Recycler<Object>) recycler()).recycle(this, recyclerHandle);
> +        }
> +    }
> +
> +    protected abstract Recycler<?> recycler();
> +
> +    protected final int idx(int index) {
> +        return offset + index;
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
> new file mode 100644
> index 0000000..949f9fb
> --- /dev/null
> +++
> b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
> @@ -0,0 +1,342 @@
> +/*
> + * Copyright 2013 The Netty Project
> + *
> + * The Netty Project licenses this file to you under the Apache License,
> + * version 2.0 (the "License"); you may not use this file except in
> compliance
> + * with the License. You may obtain a copy of the License at:
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> WITHOUT
> + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
> the
> + * License for the specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package io.netty.buffer;
> +
> +import io.netty.buffer.ByteBuf;
> +import io.netty.util.Recycler;
> +import io.netty.util.internal.PlatformDependent;
> +
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import java.nio.ByteBuffer;
> +import java.nio.ByteOrder;
> +import java.nio.channels.ClosedChannelException;
> +import java.nio.channels.GatheringByteChannel;
> +import java.nio.channels.ScatteringByteChannel;
> +
> +final class PooledUnsafeDirectByteBufL extends PooledByteBufL<ByteBuffer>
> {
> +
> +    private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder()
> == ByteOrder.LITTLE_ENDIAN;
> +
> +    private static final Recycler<PooledUnsafeDirectByteBufL> RECYCLER =
> new Recycler<PooledUnsafeDirectByteBufL>() {
> +        @Override
> +        protected PooledUnsafeDirectByteBufL newObject(Handle handle) {
> +            return new PooledUnsafeDirectByteBufL(handle, 0);
> +        }
> +    };
> +
> +    static PooledUnsafeDirectByteBufL newInstance(int maxCapacity) {
> +        PooledUnsafeDirectByteBufL buf = RECYCLER.get();
> +        buf.maxCapacity(maxCapacity);
> +        return buf;
> +    }
> +
> +    private long memoryAddress;
> +
> +    private PooledUnsafeDirectByteBufL(Recycler.Handle recyclerHandle,
> int maxCapacity) {
> +        super(recyclerHandle, maxCapacity);
> +    }
> +
> +    @Override
> +    void init(PoolChunkL<ByteBuffer> chunk, long handle, int offset, int
> length, int maxLength) {
> +        super.init(chunk, handle, offset, length, maxLength);
> +        initMemoryAddress();
> +    }
> +
> +    @Override
> +    void initUnpooled(PoolChunkL<ByteBuffer> chunk, int length) {
> +        super.initUnpooled(chunk, length);
> +        initMemoryAddress();
> +    }
> +
> +    private void initMemoryAddress() {
> +        memoryAddress = PlatformDependent.directBufferAddress(memory) +
> offset;
> +    }
> +
> +    @Override
> +    protected ByteBuffer newInternalNioBuffer(ByteBuffer memory) {
> +        return memory.duplicate();
> +    }
> +
> +    @Override
> +    public boolean isDirect() {
> +        return true;
> +    }
> +
> +    @Override
> +    protected byte _getByte(int index) {
> +        return PlatformDependent.getByte(addr(index));
> +    }
> +
> +    @Override
> +    protected short _getShort(int index) {
> +        short v = PlatformDependent.getShort(addr(index));
> +        return NATIVE_ORDER? v : Short.reverseBytes(v);
> +    }
> +
> +    @Override
> +    protected int _getUnsignedMedium(int index) {
> +        long addr = addr(index);
> +        return (PlatformDependent.getByte(addr) & 0xff) << 16 |
> +                (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
> +                PlatformDependent.getByte(addr + 2) & 0xff;
> +    }
> +
> +    @Override
> +    protected int _getInt(int index) {
> +        int v = PlatformDependent.getInt(addr(index));
> +        return NATIVE_ORDER? v : Integer.reverseBytes(v);
> +    }
> +
> +    @Override
> +    protected long _getLong(int index) {
> +        long v = PlatformDependent.getLong(addr(index));
> +        return NATIVE_ORDER? v : Long.reverseBytes(v);
> +    }
> +
> +    @Override
> +    public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int
> length) {
> +        checkIndex(index, length);
> +        if (dst == null) {
> +            throw new NullPointerException("dst");
> +        }
> +        if (dstIndex < 0 || dstIndex > dst.capacity() - length) {
> +            throw new IndexOutOfBoundsException("dstIndex: " + dstIndex);
> +        }
> +
> +        if (length != 0) {
> +            if (dst.hasMemoryAddress()) {
> +                PlatformDependent.copyMemory(addr(index),
> dst.memoryAddress() + dstIndex, length);
> +            } else if (dst.hasArray()) {
> +                PlatformDependent.copyMemory(addr(index), dst.array(),
> dst.arrayOffset() + dstIndex, length);
> +            } else {
> +                dst.setBytes(dstIndex, this, index, length);
> +            }
> +        }
> +        return this;
> +    }
> +
> +    @Override
> +    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int
> length) {
> +        checkIndex(index, length);
> +        if (dst == null) {
> +            throw new NullPointerException("dst");
> +        }
> +        if (dstIndex < 0 || dstIndex > dst.length - length) {
> +            throw new IndexOutOfBoundsException("dstIndex: " + dstIndex);
> +        }
> +        if (length != 0) {
> +            PlatformDependent.copyMemory(addr(index), dst, dstIndex,
> length);
> +        }
> +        return this;
> +    }
> +
> +    @Override
> +    public ByteBuf getBytes(int index, ByteBuffer dst) {
> +        checkIndex(index);
> +        int bytesToCopy = Math.min(capacity() - index, dst.remaining());
> +        ByteBuffer tmpBuf = internalNioBuffer();
> +        index = idx(index);
> +        tmpBuf.clear().position(index).limit(index + bytesToCopy);
> +        dst.put(tmpBuf);
> +        return this;
> +    }
> +
> +    @Override
> +    public ByteBuf getBytes(int index, OutputStream out, int length)
> throws IOException {
> +        checkIndex(index, length);
> +        if (length != 0) {
> +            byte[] tmp = new byte[length];
> +            PlatformDependent.copyMemory(addr(index), tmp, 0, length);
> +            out.write(tmp);
> +        }
> +        return this;
> +    }
> +
> +    @Override
> +    public int getBytes(int index, GatheringByteChannel out, int length)
> throws IOException {
> +        checkIndex(index, length);
> +        if (length == 0) {
> +            return 0;
> +        }
> +
> +        ByteBuffer tmpBuf = internalNioBuffer();
> +        index = idx(index);
> +        tmpBuf.clear().position(index).limit(index + length);
> +        return out.write(tmpBuf);
> +    }
> +
> +    @Override
> +    protected void _setByte(int index, int value) {
> +        PlatformDependent.putByte(addr(index), (byte) value);
> +    }
> +
> +    @Override
> +    protected void _setShort(int index, int value) {
> +        PlatformDependent.putShort(addr(index), NATIVE_ORDER ? (short)
> value : Short.reverseBytes((short) value));
> +    }
> +
> +    @Override
> +    protected void _setMedium(int index, int value) {
> +        long addr = addr(index);
> +        PlatformDependent.putByte(addr, (byte) (value >>> 16));
> +        PlatformDependent.putByte(addr + 1, (byte) (value >>> 8));
> +        PlatformDependent.putByte(addr + 2, (byte) value);
> +    }
> +
> +    @Override
> +    protected void _setInt(int index, int value) {
> +        PlatformDependent.putInt(addr(index), NATIVE_ORDER ? value :
> Integer.reverseBytes(value));
> +    }
> +
> +    @Override
> +    protected void _setLong(int index, long value) {
> +        PlatformDependent.putLong(addr(index), NATIVE_ORDER ? value :
> Long.reverseBytes(value));
> +    }
> +
> +    @Override
> +    public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int
> length) {
> +        checkIndex(index, length);
> +        if (src == null) {
> +            throw new NullPointerException("src");
> +        }
> +        if (srcIndex < 0 || srcIndex > src.capacity() - length) {
> +            throw new IndexOutOfBoundsException("srcIndex: " + srcIndex);
> +        }
> +
> +        if (length != 0) {
> +            if (src.hasMemoryAddress()) {
> +                PlatformDependent.copyMemory(src.memoryAddress() +
> srcIndex, addr(index), length);
> +            } else if (src.hasArray()) {
> +                PlatformDependent.copyMemory(src.array(),
> src.arrayOffset() + srcIndex, addr(index), length);
> +            } else {
> +                src.getBytes(srcIndex, this, index, length);
> +            }
> +        }
> +        return this;
> +    }
> +
> +    @Override
> +    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int
> length) {
> +        checkIndex(index, length);
> +        if (length != 0) {
> +            PlatformDependent.copyMemory(src, srcIndex, addr(index),
> length);
> +        }
> +        return this;
> +    }
> +
> +    @Override
> +    public ByteBuf setBytes(int index, ByteBuffer src) {
> +        checkIndex(index);
> +        ByteBuffer tmpBuf = internalNioBuffer();
> +        if (src == tmpBuf) {
> +            src = src.duplicate();
> +        }
> +
> +        index = idx(index);
> +        tmpBuf.clear().position(index).limit(index + src.remaining());
> +        tmpBuf.put(src);
> +        return this;
> +    }
> +
> +    @Override
> +    public int setBytes(int index, InputStream in, int length) throws
> IOException {
> +        checkIndex(index, length);
> +        byte[] tmp = new byte[length];
> +        int readBytes = in.read(tmp);
> +        if (readBytes > 0) {
> +            PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes);
> +        }
> +        return readBytes;
> +    }
> +
> +    @Override
> +    public int setBytes(int index, ScatteringByteChannel in, int length)
> throws IOException {
> +        checkIndex(index, length);
> +        ByteBuffer tmpNioBuf = internalNioBuffer();
> +        index = idx(index);
> +        tmpNioBuf.clear().position(index).limit(index + length);
> +        try {
> +            return in.read(tmpNioBuf);
> +        } catch (ClosedChannelException e) {
> +            return -1;
> +        }
> +    }
> +
> +    @Override
> +    public ByteBuf copy(int index, int length) {
> +        checkIndex(index, length);
> +        PooledUnsafeDirectByteBufL copy = (PooledUnsafeDirectByteBufL)
> alloc().directBuffer(length, maxCapacity());
> +        if (length != 0) {
> +            PlatformDependent.copyMemory(addr(index), copy.addr(0),
> length);
> +            copy.setIndex(0, length);
> +        }
> +        return copy;
> +    }
> +
> +    @Override
> +    public int nioBufferCount() {
> +        return 1;
> +    }
> +
> +    @Override
> +    public ByteBuffer[] nioBuffers(int index, int length) {
> +        return new ByteBuffer[] { nioBuffer(index, length) };
> +    }
> +
> +    @Override
> +    public ByteBuffer internalNioBuffer(int index, int length) {
> +        checkIndex(index, length);
> +        index = idx(index);
> +        return (ByteBuffer)
> internalNioBuffer().clear().position(index).limit(index + length);
> +    }
> +
> +    @Override
> +    public boolean hasArray() {
> +        return false;
> +    }
> +
> +    @Override
> +    public byte[] array() {
> +        throw new UnsupportedOperationException("direct buffer");
> +    }
> +
> +    @Override
> +    public int arrayOffset() {
> +        throw new UnsupportedOperationException("direct buffer");
> +    }
> +
> +    @Override
> +    public boolean hasMemoryAddress() {
> +        return true;
> +    }
> +
> +    @Override
> +    public long memoryAddress() {
> +        return memoryAddress;
> +    }
> +
> +    private long addr(int index) {
> +        return memoryAddress + index;
> +    }
> +
> +    @Override
> +    protected Recycler<?> recycler() {
> +        return RECYCLER;
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/pom.xml
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/pom.xml
> b/sandbox/prototype/exec/java-exec/pom.xml
> index 63bf4ea..15d489d 100644
> --- a/sandbox/prototype/exec/java-exec/pom.xml
> +++ b/sandbox/prototype/exec/java-exec/pom.xml
> @@ -59,6 +59,11 @@
>        <version>1.0-SNAPSHOT</version>
>      </dependency>
>      <dependency>
> +      <groupId>org.apache.drill.exec</groupId>
> +      <version>4.0.3.Final</version>
> +      <artifactId>netty-bufferl</artifactId>
> +    </dependency>
> +    <dependency>
>        <groupId>org.apache.drill</groupId>
>        <artifactId>common</artifactId>
>        <version>1.0-SNAPSHOT</version>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
> b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
> index 6ffa968..147762e 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
> @@ -118,12 +118,12 @@ public final class ${minor.class}Vector extends
> BaseDataValueVector implements F
>      }
>    }
>
> -  public void copyValue(int inIndex, int outIndex, ${minor.class}Vector
> v){
> +  public void copyFrom(int inIndex, int outIndex, ${minor.class}Vector v){
>      <#if (type.width > 8)>
>      data.getBytes(inIndex * ${type.width}, v.data, outIndex *
> ${type.width}, ${type.width});
>      <#else> <#-- type.width <= 8 -->
>      data.set${(minor.javaType!type.javaType)?cap_first}(outIndex *
> ${type.width},
> -        data.get${(minor.javaType!type.javaType)?cap_first}(inIndex *
> ${type.width})
> +        v.data.get${(minor.javaType!type.javaType)?cap_first}(inIndex *
> ${type.width})
>      );
>      </#if> <#-- type.width -->
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
> b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
> index e2387a7..644019e 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
> @@ -199,9 +199,9 @@ public final class ${className} extends
> BaseValueVector implements <#if type.maj
>    }
>
>
> -  public void copyValue(int inIndex, int outIndex,
> Nullable${minor.class}Vector v){
> -    bits.copyValue(inIndex, outIndex, v.bits);
> -    values.copyValue(inIndex, outIndex, v.values);
> +  public void copyFrom(int inIndex, int outIndex,
> Nullable${minor.class}Vector v){
> +    bits.copyFrom(inIndex, outIndex, v.bits);
> +    values.copyFrom(inIndex, outIndex, v.values);
>    }
>
>    public final class Accessor implements ValueVector.Accessor{
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
> b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
> index 99b24de..26564c1 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
> @@ -91,7 +91,7 @@ import com.google.common.collect.Lists;
>      }
>    }
>
> -  public void copyValue(int inIndex, int outIndex,
> Repeated${minor.class}Vector v){
> +  public void copyFrom(int inIndex, int outIndex,
> Repeated${minor.class}Vector v){
>      throw new UnsupportedOperationException();
>    }
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
> b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
> index 40363a7..95965f7 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
> @@ -119,7 +119,7 @@ public final class ${minor.class}Vector extends
> BaseDataValueVector implements V
>      clear();
>    }
>
> -  public void copyValue(int inIndex, int outIndex, ${minor.class}Vector
> v){
> +  public void copyFrom(int inIndex, int outIndex, ${minor.class}Vector v){
>      int start =
> offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(inIndex);
>      int end =
> offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(inIndex+1);
>      int len = end - start;
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
> index b2503c1..1b49d54 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
> @@ -18,12 +18,11 @@
>  package org.apache.drill.exec.client;
>
>  import static com.google.common.base.Preconditions.checkState;
> -import static com.google.common.collect.Iterables.get;
>  import static
> org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
>  import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
>  import io.netty.buffer.ByteBuf;
>  import io.netty.buffer.ByteBufAllocator;
> -import io.netty.buffer.PooledByteBufAllocator;
> +import io.netty.buffer.PooledByteBufAllocatorL;
>  import io.netty.channel.nio.NioEventLoopGroup;
>
>  import java.io.Closeable;
> @@ -57,8 +56,9 @@ public class DrillClient implements Closeable{
>
>    DrillConfig config;
>    private UserClient client;
> -  private ClusterCoordinator clusterCoordinator;
> -
> +  private volatile ClusterCoordinator clusterCoordinator;
> +  private volatile boolean connected = false;
> +
>    public DrillClient() {
>      this(DrillConfig.create());
>    }
> @@ -82,25 +82,33 @@ public class DrillClient implements Closeable{
>     *
>     * @throws IOException
>     */
> -  public void connect() throws Exception {
> +  public synchronized void connect() throws RpcException {
> +    if(connected) return;
> +
>      if(clusterCoordinator == null){
> -      this.clusterCoordinator = new ZKClusterCoordinator(this.config);
> -      this.clusterCoordinator.start(10000);
> +      try {
> +        this.clusterCoordinator = new ZKClusterCoordinator(this.config);
> +        this.clusterCoordinator.start(10000);
> +      } catch (Exception e) {
> +        throw new RpcException("Failure setting up ZK for client.", e);
> +      }
> +
>      }
>
>      Collection<DrillbitEndpoint> endpoints =
> clusterCoordinator.getAvailableEndpoints();
>      checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
>      // just use the first endpoint for now
>      DrillbitEndpoint endpoint = endpoints.iterator().next();
> -    ByteBufAllocator bb = new PooledByteBufAllocator(true);
> +    ByteBufAllocator bb = new PooledByteBufAllocatorL(true);
>      this.client = new UserClient(bb, new NioEventLoopGroup(1, new
> NamedThreadFactory("Client-")));
>      try {
>        logger.debug("Connecting to server {}:{}", endpoint.getAddress(),
> endpoint.getUserPort());
>        FutureHandler f = new FutureHandler();
>        this.client.connect(f, endpoint);
>        f.checkedGet();
> +      connected = true;
>      } catch (InterruptedException e) {
> -      throw new IOException(e);
> +      throw new RpcException(e);
>      }
>    }
>
> @@ -111,6 +119,7 @@ public class DrillClient implements Closeable{
>     */
>    public void close() throws IOException {
>      this.client.close();
> +    connected = false;
>    }
>
>    /**
> @@ -124,7 +133,17 @@ public class DrillClient implements Closeable{
>      ListHoldingResultsListener listener = new
> ListHoldingResultsListener();
>      client.submitQuery(listener,
> newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
>      return listener.getResults();
> -
> +  }
> +
> +  /**
> +   * Submits a Logical plan for direct execution (bypasses parsing)
> +   *
> +   * @param plan the plan to execute
> +   * @return a handle for the query result
> +   * @throws RpcException
> +   */
> +  public void runQuery(QueryType type, String plan, UserResultsListener
> resultsListener){
> +    client.submitQuery(resultsListener,
> newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
>    }
>
>    private class ListHoldingResultsListener implements UserResultsListener
> {
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
> index 5130f2b..8454bb7 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
> @@ -22,9 +22,9 @@ public class FunctionImplementationRegistry {
>        FunctionHolder holder = converter.getHolder(clazz);
>        if(holder != null){
>          methods.put(holder.getFunctionName(), holder);
> -        logger.debug("Registering function {}", holder);
> +//        logger.debug("Registering function {}", holder);
>        }else{
> -        logger.debug("Unable to initialize function for class {}",
> clazz.getName());
> +        logger.warn("Unable to initialize function for class {}",
> clazz.getName());
>        }
>      }
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java
> index 027dae6..499a693 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java
> @@ -19,20 +19,21 @@ package org.apache.drill.exec.memory;
>
>  import io.netty.buffer.ByteBuf;
>  import io.netty.buffer.ByteBufAllocator;
> -import io.netty.buffer.PooledByteBufAllocator;
> +import io.netty.buffer.PooledByteBufAllocatorL;
>
> -public class DirectBufferAllocator extends BufferAllocator{
> +public class DirectBufferAllocator extends BufferAllocator {
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(DirectBufferAllocator.class);
>
> -  private final PooledByteBufAllocator buffer = new
> PooledByteBufAllocator(true);
> -
> +  private final PooledByteBufAllocatorL buffer = new
> PooledByteBufAllocatorL(true);
> +
> +  public DirectBufferAllocator() {
> +  }
> +
>    @Override
>    public ByteBuf buffer(int size) {
>      // TODO: wrap it
>      return buffer.directBuffer(size);
>    }
> -
> -
>
>    @Override
>    protected boolean pre(int bytes) {
> @@ -40,8 +41,6 @@ public class DirectBufferAllocator extends
> BufferAllocator{
>      return true;
>    }
>
> -
> -
>    @Override
>    public long getAllocatedMemory() {
>      return 0;
> @@ -52,11 +51,9 @@ public class DirectBufferAllocator extends
> BufferAllocator{
>      return buffer;
>    }
>
> -
> -
>    @Override
>    public BufferAllocator getChildAllocator(long initialReservation, long
> maximumReservation) {
> -    //TODO: Add child account allocator.
> +    // TODO: Add child account allocator.
>      return this;
>    }
>
> @@ -64,5 +61,5 @@ public class DirectBufferAllocator extends
> BufferAllocator{
>    public void close() {
>      // TODO: collect all buffers and release them away using a weak
> hashmap so we don't impact pool work
>    }
> -
> +
>  }
>
>


-- 
Regards,
Tanujit