You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2016/02/26 06:54:02 UTC

[05/11] incubator-asterixdb-hyracks git commit: Implemented the memory-bounded HashGroupby and HashJoin for BigObject

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
deleted file mode 100644
index 1cf454d..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFrameBufferManager {
-
-    /**
-     * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
-     *
-     * @throws org.apache.hyracks.api.exceptions.HyracksDataException
-     */
-    void reset() throws HyracksDataException;
-
-    /**
-     * @param frameIndex
-     * @return the specified frame, from the set of memory buffers, being
-     * managed by this memory manager
-     */
-    ByteBuffer getFrame(int frameIndex);
-
-    /**
-     * Get the startOffset of the specific frame inside buffer
-     *
-     * @param frameIndex
-     * @return the start offset of the frame returned by {@link #getFrame(int)} method.
-     */
-    int getFrameStartOffset(int frameIndex);
-
-    /**
-     * Get the size of the specific frame inside buffer
-     *
-     * @param frameIndex
-     * @return the length of the specific frame
-     */
-    int getFrameSize(int frameIndex);
-
-    /**
-     * @return the number of frames in this buffer
-     */
-    int getNumFrames();
-
-    /**
-     * Writes the whole frame into the buffer.
-     *
-     * @param frame source frame
-     * @return the id of the inserted frame. if failed to return it will be -1.
-     */
-    int insertFrame(ByteBuffer frame) throws HyracksDataException;
-
-    void close();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
deleted file mode 100644
index fab1706..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-public interface IFrameFreeSlotPolicy {
-
-    /**
-     * Find the best fit frame id which can hold the data, and then pop it out from the index.
-     * Return -1 is failed to find any.
-     *
-     * @param tobeInsertedSize the actual size of the data which should include
-     *                         the meta data like the field offset and the tuple
-     *                         count extra size
-     * @return the best fit frame id
-     */
-    int popBestFit(int tobeInsertedSize);
-
-    /**
-     * Register the new free slot into the index
-     *
-     * @param frameID
-     * @param freeSpace
-     */
-    void pushNewFrame(int frameID, int freeSpace);
-
-    /**
-     * Clear all the existing free slot information.
-     */
-    void reset();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
deleted file mode 100644
index f555971..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFramePool {
-
-    int getMinFrameSize();
-
-    int getMemoryBudgetBytes();
-
-    /**
-     * Get a frame of given size. <br>
-     * Returns {@code null} if failed to allocate the required size of frame
-     *
-     * @param frameSize the actual size of the frame.
-     * @return the allocated frame
-     * @throws HyracksDataException
-     */
-    ByteBuffer allocateFrame(int frameSize) throws HyracksDataException;
-
-    /**
-     * Reset the counters to initial status. This method should not release the pre-allocated resources.
-     */
-    void reset();
-
-    /**
-     * Release the pre-allocated resources.
-     */
-    void close();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
deleted file mode 100644
index 00decb9..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public interface ITupleBufferAccessor {
-
-    void reset(TuplePointer tuplePointer);
-
-    ByteBuffer getTupleBuffer();
-
-    int getTupleStartOffset();
-
-    int getTupleLength();
-
-    int getAbsFieldStartOffset(int fieldId);
-
-    int getFieldLength(int fieldId);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
deleted file mode 100644
index ae502a0..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public interface ITupleBufferManager {
-    /**
-     * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
-     *
-     * @throws org.apache.hyracks.api.exceptions.HyracksDataException
-     */
-    void reset() throws HyracksDataException;
-
-    /**
-     * @return the number of tuples in this buffer
-     */
-    int getNumTuples();
-
-    boolean insertTuple(IFrameTupleAccessor accessor, int idx, TuplePointer tuplePointer) throws HyracksDataException;
-
-    void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException;
-
-    void close();
-
-    ITupleBufferAccessor getTupleAccessor();
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
deleted file mode 100644
index 444b0b6..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hyracks.api.comm.FixedSizeFrame;
-import org.apache.hyracks.api.comm.FrameHelper;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
-
-public class VariableFrameMemoryManager implements IFrameBufferManager {
-
-    private class PhysicalFrameOffset {
-        IFrame physicalFrame;
-        int physicalOffset;
-
-        PhysicalFrameOffset(IFrame frame, int offset) {
-            physicalFrame = frame;
-            physicalOffset = offset;
-        }
-    }
-
-    private class LogicalFrameStartSize {
-        ByteBuffer logicalFrame;
-        int logicalStart;
-        int logicalSize;
-
-        LogicalFrameStartSize(ByteBuffer frame, int start, int size) {
-            logicalFrame = frame;
-            logicalStart = start;
-            logicalSize = size;
-        }
-    }
-
-    private final IFramePool framePool;
-    private List<PhysicalFrameOffset> physicalFrameOffsets;
-    private List<LogicalFrameStartSize> logicalFrameStartSizes;
-    private final IFrameFreeSlotPolicy freeSlotPolicy;
-
-    public VariableFrameMemoryManager(IFramePool framePool, IFrameFreeSlotPolicy freeSlotPolicy) {
-        this.framePool = framePool;
-        this.freeSlotPolicy = freeSlotPolicy;
-        int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize();
-        this.physicalFrameOffsets = new ArrayList<>(maxFrames);
-        this.logicalFrameStartSizes = new ArrayList<>(maxFrames);
-    }
-
-    private int findAvailableFrame(int frameSize) throws HyracksDataException {
-        int frameId = freeSlotPolicy.popBestFit(frameSize);
-        if (frameId >= 0) {
-            return frameId;
-        }
-        ByteBuffer buffer = framePool.allocateFrame(frameSize);
-        if (buffer != null) {
-            IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), 0);
-            physicalFrameOffsets.add(new PhysicalFrameOffset(new FixedSizeFrame(buffer), 0));
-            return physicalFrameOffsets.size() - 1;
-        }
-        return -1;
-    }
-
-    @Override
-    public void reset() throws HyracksDataException {
-        physicalFrameOffsets.clear();
-        logicalFrameStartSizes.clear();
-        freeSlotPolicy.reset();
-        framePool.reset();
-    }
-
-    @Override
-    public ByteBuffer getFrame(int frameIndex) {
-        return logicalFrameStartSizes.get(frameIndex).logicalFrame;
-    }
-
-    @Override
-    public int getFrameStartOffset(int frameIndex) {
-        return logicalFrameStartSizes.get(frameIndex).logicalStart;
-    }
-
-    @Override
-    public int getFrameSize(int frameIndex) {
-        return logicalFrameStartSizes.get(frameIndex).logicalSize;
-    }
-
-    @Override
-    public int getNumFrames() {
-        return logicalFrameStartSizes.size();
-    }
-
-    @Override
-    public int insertFrame(ByteBuffer frame) throws HyracksDataException {
-        int frameSize = frame.capacity();
-        int physicalFrameId = findAvailableFrame(frameSize);
-        if (physicalFrameId < 0) {
-            return -1;
-        }
-        ByteBuffer buffer = physicalFrameOffsets.get(physicalFrameId).physicalFrame.getBuffer();
-        int offset = physicalFrameOffsets.get(physicalFrameId).physicalOffset;
-        System.arraycopy(frame.array(), 0, buffer.array(), offset, frameSize);
-        if (offset + frameSize < buffer.capacity()) {
-            freeSlotPolicy.pushNewFrame(physicalFrameId, buffer.capacity() - offset - frameSize);
-        }
-        physicalFrameOffsets.get(physicalFrameId).physicalOffset = offset + frameSize;
-        logicalFrameStartSizes.add(new LogicalFrameStartSize(buffer, offset, frameSize));
-        return logicalFrameStartSizes.size() - 1;
-    }
-
-    @Override
-    public void close() {
-        physicalFrameOffsets.clear();
-        logicalFrameStartSizes.clear();
-        freeSlotPolicy.reset();
-        framePool.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
deleted file mode 100644
index 344f961..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class VariableFramePool implements IFramePool {
-    public static final int UNLIMITED_MEMORY = -1;
-
-    private final IHyracksFrameMgrContext ctx;
-    private final int minFrameSize;
-    private final int memBudget;
-
-    private int allocateMem;
-    private ArrayList<ByteBuffer> buffers;  // the unused slots were sorted by size increasingly.
-    private BitSet used; // the merged one also marked as used.
-
-    /**
-     * The constructor of the VariableFramePool.
-     *
-     * @param ctx
-     * @param memBudgetInBytes the given memory budgets to allocate the frames. If it less than 0, it will be treated as unlimited budgets
-     */
-    public VariableFramePool(IHyracksFrameMgrContext ctx, int memBudgetInBytes) {
-        this.ctx = ctx;
-        this.minFrameSize = ctx.getInitialFrameSize();
-        this.allocateMem = 0;
-        if (memBudgetInBytes == UNLIMITED_MEMORY) {
-            this.memBudget = Integer.MAX_VALUE;
-            this.buffers = new ArrayList<>();
-            this.used = new BitSet();
-        } else {
-            this.memBudget = memBudgetInBytes;
-            this.buffers = new ArrayList<>(memBudgetInBytes / minFrameSize);
-            this.used = new BitSet(memBudgetInBytes / minFrameSize);
-        }
-    }
-
-    @Override
-    public int getMinFrameSize() {
-        return minFrameSize;
-    }
-
-    @Override
-    public int getMemoryBudgetBytes() {
-        return memBudget;
-    }
-
-    @Override
-    public ByteBuffer allocateFrame(int frameSize) throws HyracksDataException {
-        int frameId = findExistingFrame(frameSize);
-        if (frameId >= 0) {
-            return reuseFrame(frameId);
-        }
-        if (haveEnoughFreeSpace(frameSize)) {
-            return createNewFrame(frameSize);
-        }
-        return mergeExistingFrames(frameSize);
-
-    }
-
-    private boolean haveEnoughFreeSpace(int frameSize) {
-        return frameSize + allocateMem <= memBudget;
-    }
-
-    private static int getFirstUnUsedPos(BitSet used) {
-        return used.nextClearBit(0);
-    }
-
-    private static int getLastUnUsedPos(BitSet used, int lastPos) {
-        return used.previousClearBit(lastPos);
-    }
-
-    private static int binarySearchUnusedBuffer(ArrayList<ByteBuffer> buffers, BitSet used, int frameSize) {
-        int l = getFirstUnUsedPos(used); // to skip the merged null buffers
-        int h = getLastUnUsedPos(used, (buffers.size() - 1)) + 1; // to skip the newly created buffers
-        if (l >= h) {
-            return -1;
-        }
-        int highest = h;
-        int mid = (l + h) / 2;
-        while (l < h) {
-            ByteBuffer buffer = buffers.get(mid);
-            if (buffer.capacity() == frameSize) {
-                break;
-            }
-            if (buffer.capacity() < frameSize) {
-                l = mid + 1;
-            } else {
-                h = mid;
-            }
-            mid = (l + h) / 2;
-        }
-        mid = used.nextClearBit(mid);
-        return mid < highest ? mid : -1;
-    }
-
-    private int findExistingFrame(int frameSize) {
-        return binarySearchUnusedBuffer(buffers, used, frameSize);
-    }
-
-    private ByteBuffer reuseFrame(int id) {
-        used.set(id);
-        buffers.get(id).clear();
-        return buffers.get(id);
-    }
-
-    private ByteBuffer createNewFrame(int frameSize) throws HyracksDataException {
-        buffers.add(ctx.allocateFrame(frameSize));
-        allocateMem += frameSize;
-        return reuseFrame(buffers.size() - 1);
-    }
-
-    /**
-     * The merging sequence is from the smallest to the largest order.
-     * Once the buffer get merged, it will be remove from the list in order to free the object.
-     * And the index spot of it will be marked as used.
-     *
-     * @param frameSize
-     * @return
-     * @throws HyracksDataException
-     */
-    private ByteBuffer mergeExistingFrames(int frameSize) throws HyracksDataException {
-        int mergedSize = memBudget - allocateMem;
-        int highBound = getLastUnUsedPos(used, buffers.size() - 1) + 1;
-        for (int i = getFirstUnUsedPos(used); i < highBound; ++i) {
-            if (!used.get(i)) {
-                mergedSize += deAllocateFrame(i);
-                if (mergedSize >= frameSize) {
-                    return createNewFrame(mergedSize);
-                }
-            }
-        }
-        return null;
-    }
-
-    private int deAllocateFrame(int id) {
-        ByteBuffer frame = buffers.get(id);
-        ctx.deallocateFrames(frame.capacity());
-        buffers.set(id, null);
-        used.set(id);
-        allocateMem -= frame.capacity();
-        return frame.capacity();
-    }
-
-    @Override
-    public void reset() {
-        removeEmptySpot(buffers);
-        Collections.sort(buffers, sizeByteBufferComparator);
-        used.clear();
-    }
-
-    private static void removeEmptySpot(List<ByteBuffer> buffers) {
-        for (int i = 0; i < buffers.size(); ) {
-            if (buffers.get(i) == null) {
-                buffers.remove(i);
-            } else {
-                i++;
-            }
-        }
-    }
-
-    @Override
-    public void close() {
-        buffers.clear();
-        used.clear();
-        allocateMem = 0;
-    }
-
-    private static Comparator<ByteBuffer> sizeByteBufferComparator = new Comparator<ByteBuffer>() {
-        @Override
-        public int compare(ByteBuffer o1, ByteBuffer o2) {
-            if (o1.capacity() == o2.capacity()) {
-                return 0;
-            }
-            return o1.capacity() < o2.capacity() ? -1 : 1;
-        }
-    };
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
deleted file mode 100644
index 20642bf..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.dataflow.std.sort.buffermanager;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender;
-import org.apache.hyracks.dataflow.std.sort.util.IAppendDeletableFrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public class VariableTupleMemoryManager implements ITupleBufferManager {
-
-    private final static Logger LOG = Logger.getLogger(VariableTupleMemoryManager.class.getName());
-
-    private final int MIN_FREE_SPACE;
-    private final IFramePool pool;
-    private final IFrameFreeSlotPolicy policy;
-    private final IAppendDeletableFrameTupleAccessor accessor;
-    private final ArrayList<ByteBuffer> frames;
-    private final RecordDescriptor recordDescriptor;
-    private int numTuples;
-    private int statsReOrg;
-
-    public VariableTupleMemoryManager(IFramePool framePool, RecordDescriptor recordDescriptor) {
-        this.pool = framePool;
-        int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize();
-        this.policy = new FrameFreeSlotLastFit(maxFrames);
-        this.accessor = new DeletableFrameTupleAppender(recordDescriptor);
-        this.frames = new ArrayList<>();
-        this.MIN_FREE_SPACE = calculateMinFreeSpace(recordDescriptor);
-        this.recordDescriptor = recordDescriptor;
-        this.numTuples = 0;
-        this.statsReOrg = 0;
-    }
-
-    @Override
-    public void reset() throws HyracksDataException {
-        pool.reset();
-        policy.reset();
-        frames.clear();
-        numTuples = 0;
-    }
-
-    @Override
-    public int getNumTuples() {
-        return numTuples;
-    }
-
-    @Override
-    public boolean insertTuple(IFrameTupleAccessor fta, int idx, TuplePointer tuplePointer)
-            throws HyracksDataException {
-        int requiredFreeSpace = calculatePhysicalSpace(fta, idx);
-        int frameId = findAvailableFrame(requiredFreeSpace);
-        if (frameId < 0) {
-            if (canBeInsertedAfterCleanUpFragmentation(requiredFreeSpace)) {
-                reOrganizeFrames();
-                frameId = findAvailableFrame(requiredFreeSpace);
-                statsReOrg++;
-            } else {
-                return false;
-            }
-        }
-        assert frameId >= 0;
-        accessor.reset(frames.get(frameId));
-        assert accessor.getContiguousFreeSpace() >= requiredFreeSpace;
-        int tid = accessor.append(fta, idx);
-        assert tid >= 0;
-        tuplePointer.reset(frameId, tid);
-        if (accessor.getContiguousFreeSpace() > MIN_FREE_SPACE) {
-            policy.pushNewFrame(frameId, accessor.getContiguousFreeSpace());
-        }
-        numTuples++;
-        return true;
-    }
-
-    private void reOrganizeFrames() {
-        policy.reset();
-        for (int i = 0; i < frames.size(); i++) {
-            accessor.reset(frames.get(i));
-            accessor.reOrganizeBuffer();
-            policy.pushNewFrame(i, accessor.getContiguousFreeSpace());
-        }
-    }
-
-    private boolean canBeInsertedAfterCleanUpFragmentation(int requiredFreeSpace) {
-        for (int i = 0; i < frames.size(); i++) {
-            accessor.reset(frames.get(i));
-            if (accessor.getTotalFreeSpace() >= requiredFreeSpace) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private int findAvailableFrame(int requiredFreeSpace) throws HyracksDataException {
-        int frameId = policy.popBestFit(requiredFreeSpace);
-        if (frameId >= 0) {
-            return frameId;
-        }
-
-        int frameSize = calculateMinFrameSizeToPlaceTuple(requiredFreeSpace, pool.getMinFrameSize());
-        ByteBuffer buffer = pool.allocateFrame(frameSize);
-        if (buffer != null) {
-            accessor.clear(buffer);
-            frames.add(buffer);
-            return frames.size() - 1;
-        }
-        return -1;
-    }
-
-    private static int calculateMinFrameSizeToPlaceTuple(int requiredFreeSpace, int minFrameSize) {
-        return (1 + (requiredFreeSpace + 4 - 1) / minFrameSize) * minFrameSize;
-    }
-
-    private static int calculatePhysicalSpace(IFrameTupleAccessor fta, int idx) {
-        // 4 bytes to store the offset
-        return 4 + fta.getTupleLength(idx);
-    }
-
-    private static int calculateMinFreeSpace(RecordDescriptor recordDescriptor) {
-        // + 4 for the tuple offset
-        return recordDescriptor.getFieldCount() * 4 + 4;
-    }
-
-    @Override
-    public void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException {
-        accessor.reset(frames.get(tuplePointer.frameIndex));
-        accessor.delete(tuplePointer.tupleIndex);
-        numTuples--;
-    }
-
-    @Override
-    public void close() {
-        pool.close();
-        policy.reset();
-        frames.clear();
-        numTuples = 0;
-        if (LOG.isLoggable(Level.FINE)) {
-            LOG.fine("VariableTupleMemoryManager has reorganized " + statsReOrg + " times");
-        }
-        statsReOrg = 0;
-    }
-
-    @Override
-    public ITupleBufferAccessor getTupleAccessor() {
-        return new ITupleBufferAccessor() {
-            private IAppendDeletableFrameTupleAccessor bufferAccessor = new DeletableFrameTupleAppender(
-                    recordDescriptor);
-            private int tid;
-
-            @Override
-            public void reset(TuplePointer tuplePointer) {
-                bufferAccessor.reset(frames.get(tuplePointer.frameIndex));
-                tid = tuplePointer.tupleIndex;
-            }
-
-            @Override
-            public ByteBuffer getTupleBuffer() {
-                return bufferAccessor.getBuffer();
-            }
-
-            @Override
-            public int getTupleStartOffset() {
-                return bufferAccessor.getTupleStartOffset(tid);
-            }
-
-            @Override
-            public int getTupleLength() {
-                return bufferAccessor.getTupleLength(tid);
-            }
-
-            @Override
-            public int getAbsFieldStartOffset(int fieldId) {
-                return bufferAccessor.getAbsoluteFieldStartOffset(tid, fieldId);
-            }
-
-            @Override
-            public int getFieldLength(int fieldId) {
-                return bufferAccessor.getFieldLength(tid, fieldId);
-            }
-        };
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
index f77ec63..31ea07d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
@@ -26,7 +26,7 @@ import org.apache.hyracks.api.comm.IFrameTupleAppender;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
- * Basically it a union of the {@link IFrameTupleAccessor} and {@link IFrameTupleAppender}.
+ * Basically, it is an union of the {@link IFrameTupleAccessor} and {@link IFrameTupleAppender}.
  * Moreover, it has the delete function as well.
  * This is a special TupleAccessor used for TopK sorting.
  * In HeapSort, or other Tuple-based operators, we need to append the tuple, access the arbitrary previously

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java
index c6ca09b..d9a62c8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/AbstractHeap.java
@@ -72,9 +72,7 @@ public abstract class AbstractHeap implements IHeap<IResetableComparable> {
 
     @Override
     public void reset() {
-        for (int i = 0; i < numEntry; i++) {
-            entries[i] = null;
-        }
+        Arrays.fill(entries, null);
         numEntry = 0;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
index ceae0f1..8cd6792 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
@@ -22,15 +22,19 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ISerializableTable {
 
-    public void insert(int entry, TuplePointer tuplePointer) throws HyracksDataException;
+    void insert(int entry, TuplePointer tuplePointer) throws HyracksDataException;
 
-    public void getTuplePointer(int entry, int offset, TuplePointer tuplePointer);
+    void delete(int entry);
 
-    public int getFrameCount();
+    boolean getTuplePointer(int entry, int offset, TuplePointer tuplePointer);
 
-    public int getTupleCount();
+    int getFrameCount();
 
-    public void reset();
+    int getTupleCount();
 
-    public void close();
+    int getTupleCount(int entry);
+
+    void reset();
+
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index 7db57c0..1f2ebef 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.dataflow.std.structures;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -36,14 +37,14 @@ public class SerializableHashTable implements ISerializableTable {
     private IntSerDeBuffer[] headers;
     private List<IntSerDeBuffer> contents = new ArrayList<IntSerDeBuffer>();
     private List<Integer> frameCurrentIndex = new ArrayList<Integer>();
-    private final IHyracksTaskContext ctx;
-    private int frameCapacity = 0;
+    private final IHyracksFrameMgrContext ctx;
+    private final int frameCapacity;
     private int currentLargestFrameIndex = 0;
     private int tupleCount = 0;
     private int headerFrameCount = 0;
     private TuplePointer tempTuplePointer = new TuplePointer();
 
-    public SerializableHashTable(int tableSize, final IHyracksTaskContext ctx) throws HyracksDataException {
+    public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx) throws HyracksDataException {
         this.ctx = ctx;
         int frameSize = ctx.getInitialFrameSize();
 
@@ -81,28 +82,45 @@ public class SerializableHashTable implements ISerializableTable {
     }
 
     @Override
-    public void getTuplePointer(int entry, int offset, TuplePointer dataPointer) {
+    public void delete(int entry) {
+        int hFrameIndex = getHeaderFrameIndex(entry);
+        int headerOffset = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[hFrameIndex];
+        if (header != null) {
+            int frameIndex = header.getInt(headerOffset);
+            int offsetIndex = header.getInt(headerOffset + 1);
+            if (frameIndex >= 0) {
+                IntSerDeBuffer frame = contents.get(frameIndex);
+                int entryUsedItems = frame.getInt(offsetIndex + 1);
+                frame.writeInt(offsetIndex + 1, 0);
+                tupleCount -= entryUsedItems;
+            }
+        }
+    }
+
+    @Override
+    public boolean getTuplePointer(int entry, int offset, TuplePointer dataPointer) {
         int hFrameIndex = getHeaderFrameIndex(entry);
         int headerOffset = getHeaderFrameOffset(entry);
         IntSerDeBuffer header = headers[hFrameIndex];
         if (header == null) {
             dataPointer.frameIndex = -1;
             dataPointer.tupleIndex = -1;
-            return;
+            return false;
         }
         int frameIndex = header.getInt(headerOffset);
         int offsetIndex = header.getInt(headerOffset + 1);
         if (frameIndex < 0) {
             dataPointer.frameIndex = -1;
             dataPointer.tupleIndex = -1;
-            return;
+            return false;
         }
         IntSerDeBuffer frame = contents.get(frameIndex);
         int entryUsedItems = frame.getInt(offsetIndex + 1);
         if (offset > entryUsedItems - 1) {
             dataPointer.frameIndex = -1;
             dataPointer.tupleIndex = -1;
-            return;
+            return false;
         }
         int startIndex = offsetIndex + 2 + offset * 2;
         while (startIndex >= frameCapacity) {
@@ -112,6 +130,7 @@ public class SerializableHashTable implements ISerializableTable {
         frame = contents.get(frameIndex);
         dataPointer.frameIndex = frame.getInt(startIndex);
         dataPointer.tupleIndex = frame.getInt(startIndex + 1);
+        return true;
     }
 
     @Override
@@ -139,9 +158,26 @@ public class SerializableHashTable implements ISerializableTable {
     }
 
     @Override
+    public int getTupleCount(int entry) {
+        int hFrameIndex = getHeaderFrameIndex(entry);
+        int headerOffset = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[hFrameIndex];
+        if (header != null) {
+            int frameIndex = header.getInt(headerOffset);
+            int offsetIndex = header.getInt(headerOffset + 1);
+            if (frameIndex >= 0) {
+                IntSerDeBuffer frame = contents.get(frameIndex);
+                int entryUsedItems = frame.getInt(offsetIndex + 1);
+                return entryUsedItems;
+            }
+        }
+        return 0;
+    }
+
+    @Override
     public void close() {
         int nFrames = contents.size();
-    	for (int i = 0; i < headers.length; i++)
+        for (int i = 0; i < headers.length; i++)
             headers[i] = null;
         contents.clear();
         frameCurrentIndex.clear();
@@ -259,31 +295,30 @@ public class SerializableHashTable implements ISerializableTable {
         return offset;
     }
 
-}
-
-class IntSerDeBuffer {
+    class IntSerDeBuffer {
 
-    private byte[] bytes;
+        private byte[] bytes;
 
-    public IntSerDeBuffer(byte[] data) {
-        this.bytes = data;
-    }
+        public IntSerDeBuffer(byte[] data) {
+            this.bytes = data;
+        }
 
-    public int getInt(int pos) {
-        int offset = pos * 4;
-        return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
-                + ((bytes[offset + 3] & 0xff) << 0);
-    }
+        public int getInt(int pos) {
+            int offset = pos * 4;
+            return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16)
+                    + ((bytes[offset + 2] & 0xff) << 8) + ((bytes[offset + 3] & 0xff) << 0);
+        }
 
-    public void writeInt(int pos, int value) {
-        int offset = pos * 4;
-        bytes[offset++] = (byte) (value >> 24);
-        bytes[offset++] = (byte) (value >> 16);
-        bytes[offset++] = (byte) (value >> 8);
-        bytes[offset++] = (byte) (value);
-    }
+        public void writeInt(int pos, int value) {
+            int offset = pos * 4;
+            bytes[offset++] = (byte) (value >> 24);
+            bytes[offset++] = (byte) (value >> 16);
+            bytes[offset++] = (byte) (value >> 8);
+            bytes[offset++] = (byte) (value);
+        }
 
-    public int capacity() {
-        return bytes.length / 4;
+        public int capacity() {
+            return bytes.length / 4;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java
new file mode 100644
index 0000000..c74fe04
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.util;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
+
+public class FrameTuplePairComparator {
+    private final int[] keys0;
+    private final int[] keys1;
+    private final IBinaryComparator[] comparators;
+
+    public FrameTuplePairComparator(int[] keys0, int[] keys1, IBinaryComparator[] comparators) {
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.comparators = comparators;
+    }
+
+    public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
+        int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+        int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+        int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+        int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+        for (int i = 0; i < keys0.length; ++i) {
+            int fIdx0 = keys0[i];
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, fIdx0);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, fIdx0);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fIdx1 = keys1[i];
+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, fIdx1);
+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+    public int compare(IFrameTupleAccessor accessor0, int tIndex0, ITuplePointerAccessor bufferAccessor)
+            throws HyracksDataException {
+        int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+        int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+        for (int i = 0; i < keys0.length; ++i) {
+            int fIdx0 = keys0[i];
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, fIdx0);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, fIdx0);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = bufferAccessor.getAbsFieldStartOffset(keys1[i]);
+            int fLen1 = bufferAccessor.getFieldLength(keys1[i]);
+
+            int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, bufferAccessor
+                    .getBuffer().array(), fStart1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java
new file mode 100644
index 0000000..3e4e578
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractFramePoolTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.BUDGET;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.MIN_FRAME_SIZE;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.NUM_MIN_FRAME;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.commonFrameManager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractFramePoolTest {
+    IFramePool pool;
+
+    @Test
+    public void testGetMinFrameSize() throws Exception {
+        assertEquals(MIN_FRAME_SIZE, commonFrameManager.getInitialFrameSize());
+        assertEquals(MIN_FRAME_SIZE, pool.getMinFrameSize());
+    }
+
+    @Test
+    public void testGetMemoryBudgetBytes() throws Exception {
+        assertEquals(BUDGET, pool.getMemoryBudgetBytes());
+    }
+
+    protected void testAllocateShouldFailAfterAllSpaceGetUsed() throws HyracksDataException {
+        for (int i = 0; i < NUM_MIN_FRAME; i++) {
+            assertNull(pool.allocateFrame(MIN_FRAME_SIZE));
+        }
+    }
+
+    protected HashSet<ByteBufferPtr> testAllocateAllSpacesWithMinFrames() throws HyracksDataException {
+        HashSet<ByteBufferPtr> set = new HashSet<>();
+        for (int i = 0; i < NUM_MIN_FRAME; i++) {
+            testAllocateNewBuffer(set, MIN_FRAME_SIZE);
+        }
+        return set;
+    }
+
+    protected void testAllocateNewBuffer(HashSet<ByteBufferPtr> set, int frameSize) throws HyracksDataException {
+        ByteBuffer buffer = pool.allocateFrame(frameSize);
+        assertNotNull(buffer);
+        assertEquals(buffer.capacity(), frameSize);
+        assertTrue(!set.contains(new ByteBufferPtr(buffer)));
+        set.add(new ByteBufferPtr(buffer));
+    }
+
+    /**
+     * Pool will become 1,2,3,4,5
+     *
+     * @throws HyracksDataException
+     */
+    protected Set<ByteBufferPtr> testAllocateVariableFrames() throws HyracksDataException {
+        int budget = BUDGET;
+        int allocate = 0;
+        int i = 1;
+        Set<ByteBufferPtr> set = new HashSet<>();
+        while (budget - allocate >= i * MIN_FRAME_SIZE) {
+            ByteBuffer buffer = pool.allocateFrame(i * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            set.add(new ByteBufferPtr(buffer));
+            allocate += i++ * MIN_FRAME_SIZE;
+        }
+        return set;
+    }
+
+    protected void testShouldFindTheMatchFrames(Set<?> set) throws HyracksDataException {
+        pool.reset();
+        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
+
+        for (int i = 0; i < list.size(); i++) {
+            ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            assertTrue(set.contains(new ByteBufferPtr(buffer)));
+            assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity());
+        }
+        pool.reset();
+        for (int i = list.size() - 1; i >= 0; i--) {
+            ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            assertTrue(set.contains(new ByteBufferPtr(buffer)));
+            assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity());
+        }
+
+        Collections.shuffle(list);
+        pool.reset();
+        for (int i = 0; i < list.size(); i++) {
+            ByteBuffer buffer = pool.allocateFrame(list.get(i) * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            assertTrue(set.contains(new ByteBufferPtr(buffer)));
+            assertEquals(list.get(i) * MIN_FRAME_SIZE, buffer.capacity());
+        }
+
+    }
+
+    public static class ByteBufferPtr {
+        ByteBuffer bytebuffer;
+
+        public ByteBufferPtr(ByteBuffer buffer) {
+            bytebuffer = buffer;
+        }
+
+        @Override
+        public int hashCode() {
+            return bytebuffer.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return this.bytebuffer == ((ByteBufferPtr) obj).bytebuffer;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java
new file mode 100644
index 0000000..cdf8834
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/AbstractTupleMemoryManagerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hyracks.api.comm.FixedSizeFrame;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+import org.apache.hyracks.dataflow.std.sort.Utility;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+public abstract class AbstractTupleMemoryManagerTest {
+    ISerializerDeserializer[] fieldsSerDer = new ISerializerDeserializer[] {
+            IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+    RecordDescriptor recordDescriptor = new RecordDescriptor(fieldsSerDer);
+    ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptor.getFieldCount());
+    FrameTupleAccessor inFTA = new FrameTupleAccessor(recordDescriptor);
+    Random random = new Random(System.currentTimeMillis());
+
+    abstract ITuplePointerAccessor getTupleAccessor();
+
+    protected void assertEachTupleInFTAIsInBuffer(Map<Integer, Integer> map, Map<TuplePointer, Integer> mapInserted) {
+        ITuplePointerAccessor accessor = getTupleAccessor();
+        for (Map.Entry<TuplePointer, Integer> entry : mapInserted.entrySet()) {
+            accessor.reset(entry.getKey());
+            int dataLength = map.get(entry.getValue());
+            assertEquals((int) entry.getValue(),
+                    IntSerDeUtils.getInt(accessor.getBuffer().array(), accessor.getAbsFieldStartOffset(0)));
+            assertEquals(dataLength, accessor.getTupleLength());
+        }
+        assertEquals(map.size(), mapInserted.size());
+    }
+
+    protected Map<Integer, Integer> prepareFixedSizeTuples(
+            int tuplePerFrame,
+            int extraMetaBytePerFrame,
+            int extraMetaBytePerRecord) throws HyracksDataException {
+        Map<Integer, Integer> dataSet = new HashMap<>();
+        ByteBuffer buffer = ByteBuffer.allocate(Common.BUDGET);
+        FixedSizeFrame frame = new FixedSizeFrame(buffer);
+        FrameTupleAppender appender = new FrameTupleAppender();
+        appender.reset(frame, true);
+
+        int sizePerTuple = (Common.MIN_FRAME_SIZE - 1 - tuplePerFrame * 4 - 4 - extraMetaBytePerFrame) / tuplePerFrame;
+        int sizeChar =
+                sizePerTuple - extraMetaBytePerRecord - fieldsSerDer.length * 4 - 4 - 2; //2byte to write str length
+        assert (sizeChar > 0);
+        for (int i = 0; i < Common.NUM_MIN_FRAME * tuplePerFrame; i++) {
+            tupleBuilder.reset();
+            tupleBuilder.addField(fieldsSerDer[0], i);
+            tupleBuilder.addField(fieldsSerDer[1], Utility.repeatString('a', sizeChar));
+            if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                    tupleBuilder.getSize())) {
+                assert false;
+            }
+            dataSet.put(i, tupleBuilder.getSize() + tupleBuilder.getFieldEndOffsets().length * 4);
+        }
+        inFTA.reset(buffer);
+        return dataSet;
+    }
+
+    protected Map<Integer, Integer> prepareVariableSizeTuples() throws HyracksDataException {
+        Map<Integer, Integer> dataSet = new HashMap<>();
+        ByteBuffer buffer = ByteBuffer.allocate(Common.BUDGET);
+        FixedSizeFrame frame = new FixedSizeFrame(buffer);
+        FrameTupleAppender appender = new FrameTupleAppender();
+        appender.reset(frame, true);
+
+        for (int i = 0; true; i++) {
+            tupleBuilder.reset();
+            tupleBuilder.addField(fieldsSerDer[0], i);
+            tupleBuilder.addField(fieldsSerDer[1], Utility.repeatString('a', i));
+            if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                    tupleBuilder.getSize())) {
+                break;
+            }
+            dataSet.put(i, tupleBuilder.getSize() + tupleBuilder.getFieldEndOffsets().length * 4);
+        }
+        inFTA.reset(buffer);
+        return dataSet;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java
new file mode 100644
index 0000000..7389aab
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/Common.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+
+public class Common {
+    static int MIN_FRAME_SIZE = 256;
+    static int NUM_MIN_FRAME = 15;
+    static int BUDGET = NUM_MIN_FRAME * MIN_FRAME_SIZE;
+
+    static FrameManager commonFrameManager = new FrameManager(MIN_FRAME_SIZE);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java
new file mode 100644
index 0000000..2a84e0e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/DeletableFramePoolTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.BUDGET;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.MIN_FRAME_SIZE;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.commonFrameManager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.HashSet;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DeletableFramePoolTest extends AbstractFramePoolTest {
+
+    @Before
+    public void setUp() {
+        pool = new DeallocatableFramePool(commonFrameManager, BUDGET);
+    }
+
+    DeallocatableFramePool getPool() {
+        return (DeallocatableFramePool) pool;
+    }
+
+    @Test
+    public void testAllocateBuffers() throws HyracksDataException {
+        testAllocateAllSpacesWithMinFrames();
+    }
+
+    @Test
+    public void testCanNotAllocateMore() throws HyracksDataException {
+        testAllocateAllSpacesWithMinFrames();
+        assertNull(pool.allocateFrame(MIN_FRAME_SIZE));
+    }
+
+    @Test
+    public void testReusePreAllocatedBuffer() throws HyracksDataException {
+        HashSet<ByteBufferPtr> set = testAllocateAllSpacesWithMinFrames();
+        for (ByteBufferPtr ptr : set) {
+            getPool().deAllocateBuffer(ptr.bytebuffer);
+        }
+        HashSet<ByteBufferPtr> set2 = testAllocateAllSpacesWithMinFrames();
+        assertEquals(set, set2);
+    }
+
+    @Test
+    public void testMergeCase() throws HyracksDataException {
+        HashSet<ByteBufferPtr> set = testAllocateAllSpacesWithMinFrames();
+        for (ByteBufferPtr ptr : set) {
+            getPool().deAllocateBuffer(ptr.bytebuffer);
+        }
+        set.clear();
+        int i = 1;
+        for (int sum = 0; sum + MIN_FRAME_SIZE * i <= BUDGET; i++) {
+            sum += MIN_FRAME_SIZE * i;
+            testAllocateNewBuffer(set, MIN_FRAME_SIZE * i);
+        }
+        assertNull(pool.allocateFrame(MIN_FRAME_SIZE * i));
+        for (ByteBufferPtr ptr : set) {
+            getPool().deAllocateBuffer(ptr.bytebuffer);
+        }
+        set.clear();
+        testAllocateNewBuffer(set, BUDGET);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java
new file mode 100644
index 0000000..992c7f6
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBestFitUsingTreeMapTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class FrameFreeSlotBestFitUsingTreeMapTest {
+
+    static int size = 10;
+
+    FrameFreeSlotSmallestFit policy;
+
+    @Before
+    public void intial() {
+        policy = new FrameFreeSlotSmallestFit();
+    }
+
+    @Test
+    public void testAll() {
+
+        for (int i = 0; i < size; i++) {
+            policy.pushNewFrame(i, i);
+            assertEquals(i, policy.popBestFit(i));
+        }
+        assertEquals(-1, policy.popBestFit(0));
+
+        for (int i = 0; i < size; i++) {
+            policy.pushNewFrame(i, i);
+        }
+        for (int i = 0; i < size; i++) {
+            assertEquals(i, policy.popBestFit(i));
+        }
+
+    }
+
+    @Test
+    public void testReset(){
+        testAll();
+        policy.reset();
+        testAll();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java
new file mode 100644
index 0000000..88a54bd
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotBiggestFirstTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static junit.framework.Assert.assertEquals;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class FrameFreeSlotBiggestFirstTest {
+
+    static int size = 10;
+
+    FrameFreeSlotBiggestFirst policy;
+
+    @Before
+    public void intial() {
+        policy = new FrameFreeSlotBiggestFirst(size);
+    }
+
+    @Test
+    public void testAll() {
+
+        for (int i = 0; i < size; i++) {
+            policy.pushNewFrame(i, i);
+            assertEquals(i, policy.popBestFit(i));
+        }
+        assertEquals(-1, policy.popBestFit(0));
+
+        for (int i = 0; i < size; i++) {
+            policy.pushNewFrame(i, i);
+        }
+        for (int i = 0; i < size; i++) {
+            assertEquals(size - i - 1, policy.popBestFit(0));
+        }
+
+        for (int i = 0; i < size; i++) {
+            policy.pushNewFrame(i, i);
+        }
+        for (int i = 0; i < size / 2; i++) {
+            assertEquals(size - i - 1, policy.popBestFit(size / 2));
+        }
+        assertEquals(-1, policy.popBestFit(size / 2));
+        for (int i = 0; i < size / 2; i++) {
+            assertEquals(size / 2 - i - 1, policy.popBestFit(0));
+        }
+
+    }
+
+    @Test
+    public void testReset() {
+        testAll();
+        policy.reset();
+        testAll();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java
new file mode 100644
index 0000000..87b1b91
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/FrameFreeSlotLastFitTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class FrameFreeSlotLastFitTest {
+
+    FrameFreeSlotLastFit zeroPolicy;
+    FrameFreeSlotLastFit unifiedPolicy;
+    FrameFreeSlotLastFit ascPolicy;
+    FrameFreeSlotLastFit dscPolicy;
+
+    static final int size = 10;
+    static final int medium = 5;
+
+    @Before
+    public void setUp() throws Exception {
+        zeroPolicy = new FrameFreeSlotLastFit(0);
+        unifiedPolicy = new FrameFreeSlotLastFit(size);
+        ascPolicy = new FrameFreeSlotLastFit(size);
+        dscPolicy = new FrameFreeSlotLastFit(size);
+    }
+
+    @Test
+    public void testPushAndPop() throws Exception {
+        for (int i = 0; i < size; i++) {
+            unifiedPolicy.pushNewFrame(i, medium);
+        }
+        for (int i = 0; i < size; i++) {
+            assertTrue(unifiedPolicy.popBestFit(medium) == size - i - 1);
+        }
+        assertTrue(unifiedPolicy.popBestFit(0) == -1);
+
+        for (int i = 0; i < size / 2; i++) {
+            ascPolicy.pushNewFrame(i, i);
+            assertEquals(ascPolicy.popBestFit(medium), -1);
+            dscPolicy.pushNewFrame(i, size - i - 1);
+            assertEquals(dscPolicy.popBestFit(medium), i);
+        }
+
+        for (int i = size / 2; i < size; i++) {
+            ascPolicy.pushNewFrame(i, i);
+            assertEquals(ascPolicy.popBestFit(medium), i);
+            dscPolicy.pushNewFrame(i, size - i - 1);
+            assertEquals(dscPolicy.popBestFit(medium), -1);
+        }
+
+        ascPolicy.reset();
+        for (int i = 0; i < size; i++) {
+            ascPolicy.pushNewFrame(size - i, size - i);
+        }
+
+        for (int i = 0; i < size; i++) {
+            assertEquals(size - i, ascPolicy.popBestFit(size - i));
+        }
+    }
+
+    @Test
+    public void testReset() throws Exception {
+        testPushAndPop();
+
+        zeroPolicy.reset();
+        unifiedPolicy.reset();
+        ascPolicy.reset();
+        dscPolicy.reset();
+        testPushAndPop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java
new file mode 100644
index 0000000..ce31108
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class VPartitionTupleBufferManagerTest extends AbstractTupleMemoryManagerTest {
+
+    VPartitionTupleBufferManager bufferManager;
+    final int PARTITION = 4;
+
+    @Before
+    public void setup() throws HyracksDataException {
+        IPartitionedMemoryConstrain constrain = new IPartitionedMemoryConstrain() {
+            @Override
+            public int frameLimit(int partitionId) {
+                return Integer.MAX_VALUE;
+            }
+        };
+        bufferManager = new VPartitionTupleBufferManager(Common.commonFrameManager, constrain, PARTITION,
+                Common.BUDGET);
+    }
+
+    @Test
+    public void testGetNumPartitions() throws Exception {
+        assertEquals(PARTITION, bufferManager.getNumPartitions());
+    }
+
+    @Test
+    public void testGetNumTuples() throws Exception {
+        testNumTuplesAndSizeIsZero();
+    }
+
+    @Test
+    public void testInsertToFull() throws HyracksDataException {
+        Map<Integer, Integer> inMap = prepareFixedSizeTuples(10, 0, 0);
+        for (int pid = 0; pid < PARTITION; pid++) {
+            assertInsertOnePartitionToFull(pid, inMap);
+            bufferManager.reset();
+        }
+    }
+
+    @Test
+    public void testInsertClearSequence() throws HyracksDataException {
+        Map<Integer, Integer> inMap = prepareFixedSizeTuples(10, 0, 0);
+        for (int pid = 0; pid < PARTITION; pid++) {
+            assertInsertOnePartitionToFull(pid, inMap);
+            bufferManager.reset();
+        }
+    }
+
+    private void assertInsertOnePartitionToFull(int pid, Map<Integer, Integer> inMap) throws HyracksDataException {
+        testNumTuplesAndSizeIsZero();
+
+        Map<TuplePointer, Integer> outMap = testInsertOnePartitionToFull(pid);
+        assertEquals(outMap.size(), bufferManager.getNumTuples(pid));
+        assertEquals(Common.BUDGET, bufferManager.getPhysicalSize(pid));
+        testCanNotInsertToAnyPartitions();
+        assertEachTupleInFTAIsInBuffer(inMap, outMap);
+
+    }
+
+    private void testCanNotInsertToAnyPartitions() throws HyracksDataException {
+        for (int i = 0; i < PARTITION; i++) {
+            assertFalse(bufferManager.insertTuple(i, tupleBuilder.getByteArray(), tupleBuilder.getFieldEndOffsets(), 0,
+                    tupleBuilder.getSize(), null));
+        }
+    }
+
+    private Map<TuplePointer, Integer> testInsertOnePartitionToFull(int idpart) throws HyracksDataException {
+        Map<TuplePointer, Integer> tuplePointerIntegerMap = new HashMap<>();
+
+        for (int i = 0; i < inFTA.getTupleCount(); i++) {
+            TuplePointer tuplePointer = new TuplePointer();
+            copyDataToTupleBuilder(inFTA, i, tupleBuilder);
+            if (!bufferManager.insertTuple(idpart, tupleBuilder.getByteArray(), tupleBuilder.getFieldEndOffsets(), 0,
+                    tupleBuilder.getSize(), tuplePointer)) {
+                assert false;
+            }
+            tuplePointerIntegerMap.put(tuplePointer,
+                    IntSerDeUtils.getInt(inFTA.getBuffer().array(), inFTA.getAbsoluteFieldStartOffset(i, 0)));
+        }
+        return tuplePointerIntegerMap;
+
+    }
+
+    private static void copyDataToTupleBuilder(FrameTupleAccessor inFTA, int tid, ArrayTupleBuilder tupleBuilder)
+            throws HyracksDataException {
+        tupleBuilder.reset();
+        for (int fid = 0; fid < inFTA.getFieldCount(); fid++) {
+            tupleBuilder.addField(inFTA.getBuffer().array(), inFTA.getAbsoluteFieldStartOffset(tid, fid),
+                    inFTA.getFieldLength(tid, fid));
+        }
+    }
+
+    private void testNumTuplesAndSizeIsZero() {
+        for (int i = 0; i < bufferManager.getNumPartitions(); ++i) {
+            assertEquals(0, bufferManager.getNumTuples(i));
+            assertEquals(0, bufferManager.getPhysicalSize(0));
+        }
+    }
+
+    @Test
+    public void testClearPartition() throws Exception {
+
+        Map<Integer, Integer> inMap = prepareFixedSizeTuples(10, 0, 0);
+        for (int pid = 0; pid < PARTITION; pid++) {
+            assertInsertOnePartitionToFull(pid, inMap);
+            assertClearFullPartitionIsTheSameAsReset();
+        }
+    }
+
+    private void assertClearFullPartitionIsTheSameAsReset() throws HyracksDataException {
+        for (int i = 0; i < PARTITION; i++) {
+            bufferManager.clearPartition(i);
+        }
+        testNumTuplesAndSizeIsZero();
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        testInsertToFull();
+        bufferManager.close();
+    }
+
+    @Override
+    ITuplePointerAccessor getTupleAccessor() {
+        return bufferManager.getTupleAccessor(recordDescriptor);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java
new file mode 100644
index 0000000..8dbe1f0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePoolTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.BUDGET;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.MIN_FRAME_SIZE;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.NUM_MIN_FRAME;
+import static org.apache.hyracks.dataflow.std.buffermanager.Common.commonFrameManager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class VariableFramePoolTest extends AbstractFramePoolTest {
+
+    @Before
+    public void setUp() throws Exception {
+        pool = new VariableFramePool(commonFrameManager, BUDGET);
+    }
+
+    @Test
+    public void testAllocateUniformFrameShouldSuccess() throws Exception {
+        testAllocateAllSpacesWithMinFrames();
+        testAllocateShouldFailAfterAllSpaceGetUsed();
+        pool.reset();
+        testAllocateAllSpacesWithMinFrames();
+        pool.close();
+    }
+
+    @Test
+    public void testResetShouldReuseExistingFrames() throws HyracksDataException {
+        Set<?> set1 = testAllocateAllSpacesWithMinFrames();
+        pool.reset();
+        Set<?> set2 = testAllocateAllSpacesWithMinFrames();
+        assertEquals(set1, set2);
+        pool.close();
+    }
+
+    @Test
+    public void testCloseShouldNotReuseExistingFrames() throws HyracksDataException {
+        Set<?> set1 = testAllocateAllSpacesWithMinFrames();
+        pool.close();
+        Set<?> set2 = testAllocateAllSpacesWithMinFrames();
+        assertFalse(set1.equals(set2));
+        pool.close();
+    }
+
+    @Test
+    public void testShouldReturnLargerFramesIfFitOneIsUsed() throws HyracksDataException {
+        Set<?> set = testAllocateVariableFrames();
+        pool.reset();
+        testShouldFindTheMatchFrames(set);
+        pool.reset();
+
+        // allocate seq: 1, 1, 2, 3, 4
+        ByteBuffer placeBuffer = pool.allocateFrame(MIN_FRAME_SIZE);
+        assertTrue(set.contains(new ByteBufferPtr(placeBuffer)));
+        for (int i = 1; i <= 4; i++) {
+            ByteBuffer buffer = pool.allocateFrame(i * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            assertTrue(set.contains(new ByteBufferPtr(buffer)));
+        }
+        assertNull(pool.allocateFrame(MIN_FRAME_SIZE));
+        pool.close();
+    }
+
+    @Test
+    public void testShouldMergeIfNoLargerFrames() throws HyracksDataException {
+        Set<?> set = testAllocateAllSpacesWithMinFrames();
+        pool.reset();
+        int chunks = 5;
+        for (int i = 0; i < NUM_MIN_FRAME; i += chunks) {
+            ByteBuffer buffer = pool.allocateFrame(chunks * MIN_FRAME_SIZE);
+            assertNotNull(buffer);
+            assertTrue(!set.contains(new ByteBufferPtr(buffer)));
+        }
+    }
+
+    @Test
+    public void testUseMiddleSizeFrameAndNeedToMergeSmallAndBigger() throws HyracksDataException {
+        Set<?> set = testAllocateVariableFrames();
+        pool.reset();
+        // allocate seq: 3, 6, 1;
+        ByteBuffer buffer = pool.allocateFrame(3 * MIN_FRAME_SIZE);
+        assertTrue(set.contains(new ByteBufferPtr(buffer)));
+        buffer = pool.allocateFrame(6 * MIN_FRAME_SIZE);
+        assertFalse(set.contains(new ByteBufferPtr(buffer)));
+        buffer = pool.allocateFrame(1 * MIN_FRAME_SIZE);
+        assertTrue(set.contains(new ByteBufferPtr(buffer)));
+        assertEquals(5 * MIN_FRAME_SIZE, buffer.capacity());
+        pool.reset();
+    }
+}
\ No newline at end of file