You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2016/02/26 06:54:07 UTC

[10/11] incubator-asterixdb-hyracks git commit: Implemented the memory-bounded HashGroupby and HashJoin for BigObject

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
new file mode 100644
index 0000000..1118bf3
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Manage the buffer space in the unit of frame.
+ */
+public interface IFrameBufferManager {
+
+    /**
+     * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
+     *
+     * @throws org.apache.hyracks.api.exceptions.HyracksDataException
+     */
+    void reset() throws HyracksDataException;
+
+    /**
+     * @param frameIndex
+     * @param bufferInfo the given object need to be reset
+     * @return the filled bufferInfo to facilitate the chain access
+     */
+    BufferInfo getFrame(int frameIndex, BufferInfo bufferInfo);
+
+    /**
+     * @return the number of frames in this buffer
+     */
+    int getNumFrames();
+
+    /**
+     * Writes the whole frame into the buffer.
+     *
+     * @param frame source frame
+     * @return the id of the inserted frame. return -1 if it failed to insert
+     */
+    int insertFrame(ByteBuffer frame) throws HyracksDataException;
+
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java
new file mode 100644
index 0000000..8a1e004
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameFreeSlotPolicy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+public interface IFrameFreeSlotPolicy {
+
+    /**
+     * Find the best fit frame id which can hold the data, and then pop it out from the index.
+     * Return -1 is failed to find any.
+     *
+     * @param tobeInsertedSize the actual size of the data which should include
+     *                         the meta data like the field offset and the tuple
+     *                         count extra size
+     * @return the best fit frame id
+     */
+    int popBestFit(int tobeInsertedSize);
+
+    /**
+     * Register the new free slot into the index
+     *
+     * @param frameID
+     * @param freeSpace
+     */
+    void pushNewFrame(int frameID, int freeSpace);
+
+    /**
+     * Clear all the existing free slot information.
+     */
+    void reset();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFramePool.java
new file mode 100644
index 0000000..2c6b01e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFramePool.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFramePool {
+
+    int getMinFrameSize();
+
+    int getMemoryBudgetBytes();
+
+    /**
+     * Get a frame of given size. <br>
+     * Returns {@code null} if failed to allocate the required size of frame
+     *
+     * @param frameSize the actual size of the frame.
+     * @return the allocated frame
+     * @throws HyracksDataException
+     */
+    ByteBuffer allocateFrame(int frameSize) throws HyracksDataException;
+
+    /**
+     * Reset the counters to initial status. This method should not release the pre-allocated resources.
+     */
+    void reset();
+
+    /**
+     * Release the pre-allocated resources.
+     */
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedMemoryConstrain.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedMemoryConstrain.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedMemoryConstrain.java
new file mode 100644
index 0000000..1ba0745
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedMemoryConstrain.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+/**
+ * Provide a customized constraint for different partitions.
+ */
+public interface IPartitionedMemoryConstrain {
+    /**
+     * Set number of frames that can be used by the specific partition.
+     * 
+     * @param partitionId
+     * @return
+     */
+    int frameLimit(int partitionId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
new file mode 100644
index 0000000..70ca2dd
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+public interface IPartitionedTupleBufferManager {
+
+    int getNumPartitions();
+
+    int getNumTuples(int partition);
+
+    int getPhysicalSize(int partition);
+
+    /**
+     * Insert tuple from (byte[] byteArray,int[] fieldEndOffsets, int start, int size) into
+     * specified partition. The handle is written into the tuplepointer.
+     * <br>
+     * If {@code byteArray} contains the {@code fieldEndOffsets} already, then please set the {@code fieldEndOffsets} as NULL
+     *
+     * @param partition
+     *            the id of the partition to insert the tuple into
+     * @param byteArray
+     *            the byteArray which contains the tuple
+     * @param fieldEndOffsets
+     *            the fieldEndOffsets which comes from the ArrayTupleBuilder, please set it to NULL if the {@code byteArray} already contains the fieldEndOffsets
+     * @param start
+     *            the start offset in the {@code byteArray}
+     * @param size
+     *            the size of the tuple
+     * @param pointer
+     *            the returned pointer indicating the handler inside this buffer manager
+     * @return a boolean value to indicate if the insertion succeed or not
+     */
+    boolean insertTuple(int partition, byte[] byteArray, int[] fieldEndOffsets, int start, int size,
+            TuplePointer pointer) throws HyracksDataException;
+
+    /**
+     * Insert tuple {@code tupleId} from the {@code tupleAccessor} into the given partition.
+     * The returned handle is written into the tuplepointer
+     * 
+     * @param partition
+     *            the id of the partition to insert the tuple
+     * @param tupleAccessor
+     *            the FrameTupleAccessor storage
+     * @param tupleId
+     *            the id of the tuple from the tupleAccessor
+     * @param pointer
+     *            the returned pointer indicating the handler to later fetch the tuple from the buffer maanager
+     * @return true if the insertion succeed. Otherwise return false.
+     * @throws HyracksDataException
+     */
+    boolean insertTuple(int partition, IFrameTupleAccessor tupleAccessor, int tupleId, TuplePointer pointer)
+            throws HyracksDataException;
+
+    /**
+     * Reset to the initial states. The previous allocated resources won't be released in order to be used in the next round.
+     * 
+     * @throws HyracksDataException
+     */
+    void reset() throws HyracksDataException;
+
+    /**
+     * Close the managers which will explicitly release all the allocated resources.
+     */
+    void close();
+
+    ITuplePointerAccessor getTupleAccessor(RecordDescriptor recordDescriptor);
+
+    /**
+     * Flush the particular partition {@code pid} to {@code writer}.
+     * This partition will not be cleared.
+     * Currently it is used by Join where we flush the inner partition to the join (as a frameWriter),
+     * but we will still keep the inner for the next outer partition.
+     * 
+     * @param pid
+     * @param writer
+     * @throws HyracksDataException
+     */
+    void flushPartition(int pid, IFrameWriter writer) throws HyracksDataException;
+
+    /**
+     * Clear the memory occupation of the particular partition.
+     * 
+     * @param partition
+     * @throws HyracksDataException
+     */
+    void clearPartition(int partition) throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferManager.java
new file mode 100644
index 0000000..324401a
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITupleBufferManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * Manage the buffer space. Different from the {@link IFrameBufferManager}, this one allows the record level manipulation.
+ */
+public interface ITupleBufferManager {
+    /**
+     * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
+     *
+     * @throws org.apache.hyracks.api.exceptions.HyracksDataException
+     */
+    void reset() throws HyracksDataException;
+
+    /**
+     * @return the number of tuples in this buffer
+     */
+    int getNumTuples();
+
+    boolean insertTuple(IFrameTupleAccessor accessor, int idx, TuplePointer tuplePointer) throws HyracksDataException;
+
+    void close() throws HyracksDataException;
+
+    ITuplePointerAccessor createTupleAccessor();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITuplePointerAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITuplePointerAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITuplePointerAccessor.java
new file mode 100644
index 0000000..a8ab7eb
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ITuplePointerAccessor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * A cursor-like tuple level accessor to point to a tuple physical byte location inside the {@link ITupleBufferManager}
+ * Some of the BufferManger (e.g. {@link VariableDeletableTupleMemoryManager}are using the different frame structure as
+ * the common {@link org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor} does.
+ * In order to hide the complexity inside the buffer manager, clients can use this Accessor to navigate the internal record.
+ */
+public interface ITuplePointerAccessor extends IFrameTupleAccessor {
+    void reset(TuplePointer tuplePointer);
+
+    int getTupleStartOffset();
+
+    int getTupleLength();
+
+    int getAbsFieldStartOffset(int fieldId);
+
+    int getFieldLength(int fieldId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
new file mode 100644
index 0000000..f9387a9
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.util.BitSet;
+import java.util.function.IntUnaryOperator;
+
+/**
+ * This policy is used to decide which partition in {@link VPartitionTupleBufferManager} should be a victim when
+ * there is not enough space to insert new element.
+ */
+public class PreferToSpillFullyOccupiedFramePolicy {
+
+    private final IPartitionedTupleBufferManager bufferManager;
+    private final BitSet spilledStatus;
+    private final int minFrameSize;
+
+    public PreferToSpillFullyOccupiedFramePolicy(IPartitionedTupleBufferManager bufferManager, BitSet spilledStatus,
+            int minFrameSize) {
+        this.bufferManager = bufferManager;
+        this.spilledStatus = spilledStatus;
+        this.minFrameSize = minFrameSize;
+    }
+
+    public int selectVictimPartition(int failedToInsertPartition) {
+        // To avoid flush the half-full frame, it's better to spill itself.
+        if (bufferManager.getNumTuples(failedToInsertPartition) > 0) {
+            return failedToInsertPartition;
+        }
+        int partitionToSpill = findSpilledPartitionWithMaxMemoryUsage();
+        int maxToSpillPartSize = 0;
+        // if we couldn't find the already spilled partition, or it is too small to flush that one,
+        // try to flush an in memory partition.
+        if (partitionToSpill < 0
+                || (maxToSpillPartSize = bufferManager.getPhysicalSize(partitionToSpill)) == minFrameSize) {
+            int partitionInMem = findInMemPartitionWithMaxMemoryUsage();
+            if (partitionInMem >= 0 && bufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) {
+                partitionToSpill = partitionInMem;
+            }
+        }
+        return partitionToSpill;
+    }
+
+    public int findInMemPartitionWithMaxMemoryUsage() {
+        return findMaxSize(spilledStatus.nextClearBit(0), (i) -> spilledStatus.nextClearBit(i + 1));
+    }
+
+    public int findSpilledPartitionWithMaxMemoryUsage() {
+        return findMaxSize(spilledStatus.nextSetBit(0), (i) -> spilledStatus.nextSetBit(i + 1));
+    }
+
+    private int findMaxSize(int startIndex, IntUnaryOperator nextIndexOp) {
+        int pid = -1;
+        int max = 0;
+        for (int i = startIndex; i >= 0 && i < bufferManager.getNumPartitions(); i = nextIndexOp.applyAsInt(i)) {
+            int partSize = bufferManager.getPhysicalSize(i);
+            if (partSize > max) {
+                max = partSize;
+                pid = i;
+            }
+        }
+        return pid;
+    }
+
+    /**
+     * Create an constrain for the already spilled partition that it can only use at most one frame.
+     * 
+     * @param spillStatus
+     * @return
+     */
+    public static IPartitionedMemoryConstrain createAtMostOneFrameForSpilledPartitionConstrain(BitSet spillStatus) {
+        return new IPartitionedMemoryConstrain() {
+            @Override
+            public int frameLimit(int partitionId) {
+                if (spillStatus.get(partitionId)) {
+                    return 1;
+                }
+                return Integer.MAX_VALUE;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
new file mode 100644
index 0000000..ed54973
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.comm.FixedSizeFrame;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FixedSizeFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * This buffer manager will dived the buffers into given number of partitions.
+ * The cleared partition (spilled one in the caller side) can only get no more than one frame.
+ */
+public class VPartitionTupleBufferManager implements IPartitionedTupleBufferManager {
+
+    private IDeallocatableFramePool framePool;
+    private IFrameBufferManager[] partitionArray;
+    private int[] numTuples;
+    private final FixedSizeFrame appendFrame;
+    private final FixedSizeFrameTupleAppender appender;
+    private BufferInfo tempInfo;
+    private final IPartitionedMemoryConstrain constrain;
+
+    public VPartitionTupleBufferManager(IHyracksFrameMgrContext ctx, IPartitionedMemoryConstrain constrain,
+            int partitions, int frameLimitInBytes) throws HyracksDataException {
+        this.constrain = constrain;
+        this.framePool = new DeallocatableFramePool(ctx, frameLimitInBytes);
+        this.partitionArray = new IFrameBufferManager[partitions];
+        this.numTuples = new int[partitions];
+        this.appendFrame = new FixedSizeFrame();
+        this.appender = new FixedSizeFrameTupleAppender();
+        this.tempInfo = new BufferInfo(null, -1, -1);
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        for (IFrameBufferManager part : partitionArray) {
+            if (part != null) {
+                for (int i = 0; i < part.getNumFrames(); i++) {
+                    framePool.deAllocateBuffer(part.getFrame(i, tempInfo).getBuffer());
+                }
+                part.reset();
+            }
+        }
+        Arrays.fill(numTuples, 0);
+        appendFrame.reset(null);
+    }
+
+    @Override
+    public int getNumPartitions() {
+        return partitionArray.length;
+    }
+
+    @Override
+    public int getNumTuples(int partition) {
+        return numTuples[partition];
+    }
+
+    @Override
+    public int getPhysicalSize(int partitionId) {
+        int size = 0;
+        IFrameBufferManager partition = partitionArray[partitionId];
+        if (partition != null) {
+            for (int i = 0; i < partition.getNumFrames(); ++i) {
+                size += partition.getFrame(i, tempInfo).getLength();
+            }
+        }
+        return size;
+    }
+
+    @Override
+    public void clearPartition(int partitionId) throws HyracksDataException {
+        IFrameBufferManager partition = partitionArray[partitionId];
+        if (partition != null) {
+            for (int i = 0; i < partition.getNumFrames(); ++i) {
+                framePool.deAllocateBuffer(partition.getFrame(i, tempInfo).getBuffer());
+            }
+        }
+        partitionArray[partitionId].reset();
+        numTuples[partitionId] = 0;
+    }
+
+    @Override
+    public boolean insertTuple(int partition, byte[] byteArray, int[] fieldEndOffsets, int start, int size,
+            TuplePointer pointer) throws HyracksDataException {
+        int actualSize = calculateActualSize(fieldEndOffsets, size);
+        int fid = getLastBufferOrCreateNewIfNotExist(partition, actualSize);
+        if (fid < 0) {
+            return false;
+        }
+        partitionArray[partition].getFrame(fid, tempInfo);
+        int tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, start, size);
+        if (tid < 0) {
+            if (partitionArray[partition].getNumFrames() >= constrain.frameLimit(partition)) {
+                return false;
+            }
+            fid = createNewBuffer(partition, actualSize);
+            if (fid < 0) {
+                return false;
+            }
+            partitionArray[partition].getFrame(fid, tempInfo);
+            tid = appendTupleToBuffer(tempInfo, fieldEndOffsets, byteArray, start, size);
+        }
+        pointer.reset(makeGroupFrameId(partition, fid), tid);
+        numTuples[partition]++;
+        return true;
+    }
+
+    @Override
+    public boolean insertTuple(int partition, IFrameTupleAccessor tupleAccessor, int tupleId, TuplePointer pointer)
+            throws HyracksDataException {
+        return insertTuple(partition, tupleAccessor.getBuffer().array(), null,
+                tupleAccessor.getTupleStartOffset(tupleId), tupleAccessor.getTupleLength(tupleId), pointer);
+    }
+
+    private static int calculateActualSize(int[] fieldEndOffsets, int size) {
+        if (fieldEndOffsets != null) {
+            return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size);
+        }
+        return FrameHelper.calcRequiredSpace(0, size);
+    }
+
+    private int makeGroupFrameId(int partition, int fid) {
+        return fid * getNumPartitions() + partition;
+    }
+
+    private int parsePartitionId(int externalFrameId) {
+        return externalFrameId % getNumPartitions();
+    }
+
+    private int parseFrameIdInPartition(int externalFrameId) {
+        return externalFrameId / getNumPartitions();
+    }
+
+    private int createNewBuffer(int partition, int size) throws HyracksDataException {
+        ByteBuffer newBuffer = requestNewBufferFromPool(size);
+        if (newBuffer == null) {
+            return -1;
+        }
+        appendFrame.reset(newBuffer);
+        appender.reset(appendFrame, true);
+        return partitionArray[partition].insertFrame(newBuffer);
+    }
+
+    private ByteBuffer requestNewBufferFromPool(int recordSize) throws HyracksDataException {
+        int frameSize = FrameHelper.calcAlignedFrameSizeToStore(0, recordSize, framePool.getMinFrameSize());
+        return framePool.allocateFrame(frameSize);
+    }
+
+    private int appendTupleToBuffer(BufferInfo bufferInfo, int[] fieldEndOffsets, byte[] byteArray, int start, int size)
+            throws HyracksDataException {
+        assert (bufferInfo.getStartOffset() == 0) : "Haven't supported yet in FrameTupleAppender";
+        if (bufferInfo.getBuffer() != appendFrame.getBuffer()) {
+            appendFrame.reset(bufferInfo.getBuffer());
+            appender.reset(appendFrame, false);
+        }
+        if (fieldEndOffsets == null) {
+            if (appender.append(byteArray, start, size)) {
+                return appender.getTupleCount() - 1;
+            }
+        } else {
+            if (appender.append(fieldEndOffsets, byteArray, start, size)) {
+                return appender.getTupleCount() - 1;
+            }
+        }
+
+        return -1;
+    }
+
+    private int getLastBufferOrCreateNewIfNotExist(int partition, int actualSize) throws HyracksDataException {
+        if (partitionArray[partition] == null || partitionArray[partition].getNumFrames() == 0) {
+            partitionArray[partition] = new PartitionFrameBufferManager();
+            return createNewBuffer(partition, actualSize);
+        }
+        return partitionArray[partition].getNumFrames() - 1;
+    }
+
+    @Override
+    public void close() {
+        framePool.close();
+        Arrays.fill(partitionArray, null);
+    }
+
+    private class PartitionFrameBufferManager implements IFrameBufferManager {
+
+        ArrayList<ByteBuffer> buffers = new ArrayList<>();
+
+        @Override
+        public void reset() throws HyracksDataException {
+            buffers.clear();
+        }
+
+        @Override
+        public BufferInfo getFrame(int frameIndex, BufferInfo returnedInfo) {
+            returnedInfo.reset(buffers.get(frameIndex), 0, buffers.get(frameIndex).capacity());
+            return returnedInfo;
+        }
+
+        @Override
+        public int getNumFrames() {
+            return buffers.size();
+        }
+
+        @Override
+        public int insertFrame(ByteBuffer frame) throws HyracksDataException {
+            buffers.add(frame);
+            return buffers.size() - 1;
+        }
+
+        @Override
+        public void close() {
+            buffers = null;
+        }
+
+    }
+
+    @Override
+    public ITuplePointerAccessor getTupleAccessor(final RecordDescriptor recordDescriptor) {
+        return new AbstractTuplePointerAccessor() {
+            FrameTupleAccessor innerAccessor = new FrameTupleAccessor(recordDescriptor);
+
+            @Override
+            IFrameTupleAccessor getInnerAccessor() {
+                return innerAccessor;
+            }
+
+            @Override
+            void resetInnerAccessor(TuplePointer tuplePointer) {
+                partitionArray[parsePartitionId(tuplePointer.frameIndex)]
+                        .getFrame(parseFrameIdInPartition(tuplePointer.frameIndex), tempInfo);
+                innerAccessor.reset(tempInfo.getBuffer(), tempInfo.getStartOffset(), tempInfo.getLength());
+            }
+        };
+    }
+
+    @Override
+    public void flushPartition(int pid, IFrameWriter writer) throws HyracksDataException {
+        IFrameBufferManager partition = partitionArray[pid];
+        if (partition != null && getNumTuples(pid) > 0) {
+            for (int i = 0; i < partition.getNumFrames(); ++i) {
+                partition.getFrame(i, tempInfo);
+                tempInfo.getBuffer().position(tempInfo.getStartOffset());
+                tempInfo.getBuffer().limit(tempInfo.getStartOffset() + tempInfo.getLength());
+                writer.nextFrame(tempInfo.getBuffer());
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
new file mode 100644
index 0000000..4359e49
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender;
+import org.apache.hyracks.dataflow.std.sort.util.IAppendDeletableFrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * Enable the delete record operation in the memory management. This is only used in the {@link org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator}
+ */
+public class VariableDeletableTupleMemoryManager implements IDeletableTupleBufferManager {
+
+    private final static Logger LOG = Logger.getLogger(VariableDeletableTupleMemoryManager.class.getName());
+
+    private final int MIN_FREE_SPACE;
+    private final IFramePool pool;
+    private final IFrameFreeSlotPolicy policy;
+    private final IAppendDeletableFrameTupleAccessor accessor;
+    private final ArrayList<ByteBuffer> frames;
+    private final RecordDescriptor recordDescriptor;
+    private int numTuples;
+    private int statsReOrg;
+
+    public VariableDeletableTupleMemoryManager(IFramePool framePool, RecordDescriptor recordDescriptor) {
+        this.pool = framePool;
+        int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize();
+        this.policy = new FrameFreeSlotLastFit(maxFrames);
+        this.accessor = new DeletableFrameTupleAppender(recordDescriptor);
+        this.frames = new ArrayList<>();
+        this.MIN_FREE_SPACE = calculateMinFreeSpace(recordDescriptor);
+        this.recordDescriptor = recordDescriptor;
+        this.numTuples = 0;
+        this.statsReOrg = 0;
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        pool.reset();
+        policy.reset();
+        frames.clear();
+        numTuples = 0;
+    }
+
+    @Override
+    public int getNumTuples() {
+        return numTuples;
+    }
+
+    @Override
+    public boolean insertTuple(IFrameTupleAccessor fta, int idx, TuplePointer tuplePointer)
+            throws HyracksDataException {
+        int requiredFreeSpace = calculatePhysicalSpace(fta, idx);
+        int frameId = findAvailableFrame(requiredFreeSpace);
+        if (frameId < 0) {
+            if (canBeInsertedAfterCleanUpFragmentation(requiredFreeSpace)) {
+                reOrganizeFrames();
+                frameId = findAvailableFrame(requiredFreeSpace);
+                statsReOrg++;
+            } else {
+                return false;
+            }
+        }
+        assert frameId >= 0;
+        accessor.reset(frames.get(frameId));
+        assert accessor.getContiguousFreeSpace() >= requiredFreeSpace;
+        int tid = accessor.append(fta, idx);
+        assert tid >= 0;
+        tuplePointer.reset(frameId, tid);
+        if (accessor.getContiguousFreeSpace() > MIN_FREE_SPACE) {
+            policy.pushNewFrame(frameId, accessor.getContiguousFreeSpace());
+        }
+        numTuples++;
+        return true;
+    }
+
+    private void reOrganizeFrames() {
+        policy.reset();
+        for (int i = 0; i < frames.size(); i++) {
+            accessor.reset(frames.get(i));
+            accessor.reOrganizeBuffer();
+            policy.pushNewFrame(i, accessor.getContiguousFreeSpace());
+        }
+    }
+
+    private boolean canBeInsertedAfterCleanUpFragmentation(int requiredFreeSpace) {
+        for (int i = 0; i < frames.size(); i++) {
+            accessor.reset(frames.get(i));
+            if (accessor.getTotalFreeSpace() >= requiredFreeSpace) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private int findAvailableFrame(int requiredFreeSpace) throws HyracksDataException {
+        int frameId = policy.popBestFit(requiredFreeSpace);
+        if (frameId >= 0) {
+            return frameId;
+        }
+
+        int frameSize = calculateMinFrameSizeToPlaceTuple(requiredFreeSpace, pool.getMinFrameSize());
+        ByteBuffer buffer = pool.allocateFrame(frameSize);
+        if (buffer != null) {
+            accessor.clear(buffer);
+            frames.add(buffer);
+            return frames.size() - 1;
+        }
+        return -1;
+    }
+
+    private static int calculateMinFrameSizeToPlaceTuple(int requiredFreeSpace, int minFrameSize) {
+        return (1 + (requiredFreeSpace + 4 - 1) / minFrameSize) * minFrameSize;
+    }
+
+    private static int calculatePhysicalSpace(IFrameTupleAccessor fta, int idx) {
+        // 4 bytes to store the offset
+        return 4 + fta.getTupleLength(idx);
+    }
+
+    private static int calculateMinFreeSpace(RecordDescriptor recordDescriptor) {
+        // + 4 for the tuple offset
+        return recordDescriptor.getFieldCount() * 4 + 4;
+    }
+
+    @Override
+    public void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException {
+        accessor.reset(frames.get(tuplePointer.frameIndex));
+        accessor.delete(tuplePointer.tupleIndex);
+        numTuples--;
+    }
+
+    @Override
+    public void close() {
+        pool.close();
+        policy.reset();
+        frames.clear();
+        numTuples = 0;
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("VariableTupleMemoryManager has reorganized " + statsReOrg + " times");
+        }
+        statsReOrg = 0;
+    }
+
+    @Override
+    public ITuplePointerAccessor createTupleAccessor() {
+        return new AbstractTuplePointerAccessor() {
+            private IAppendDeletableFrameTupleAccessor bufferAccessor = new DeletableFrameTupleAppender(
+                    recordDescriptor);
+
+            @Override
+            IFrameTupleAccessor getInnerAccessor() {
+                return bufferAccessor;
+            }
+
+            @Override
+            void resetInnerAccessor(TuplePointer tuplePointer) {
+                bufferAccessor.reset(frames.get(tuplePointer.frameIndex));
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
new file mode 100644
index 0000000..fe7de23
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class VariableFrameMemoryManager implements IFrameBufferManager {
+
+    class PhysicalFrameOffset {
+        ByteBuffer physicalFrame;
+        int physicalOffset;
+
+        PhysicalFrameOffset(ByteBuffer frame, int offset) {
+            physicalFrame = frame;
+            physicalOffset = offset;
+        }
+    }
+
+    private final IFramePool framePool;
+    private List<PhysicalFrameOffset> physicalFrameOffsets;
+    private List<BufferInfo> logicalFrameStartSizes;
+    private final IFrameFreeSlotPolicy freeSlotPolicy;
+
+    public VariableFrameMemoryManager(IFramePool framePool, IFrameFreeSlotPolicy freeSlotPolicy) {
+        this.framePool = framePool;
+        this.freeSlotPolicy = freeSlotPolicy;
+        this.physicalFrameOffsets = new ArrayList<>();
+        this.logicalFrameStartSizes = new ArrayList<>();
+    }
+
+    private int findAvailableFrame(int frameSize) throws HyracksDataException {
+        int frameId = freeSlotPolicy.popBestFit(frameSize);
+        if (frameId >= 0) {
+            return frameId;
+        }
+        ByteBuffer buffer = framePool.allocateFrame(frameSize);
+        if (buffer != null) {
+            IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), 0);
+            physicalFrameOffsets.add(new PhysicalFrameOffset(buffer, 0));
+            return physicalFrameOffsets.size() - 1;
+        }
+        return -1;
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        physicalFrameOffsets.clear();
+        logicalFrameStartSizes.clear();
+        freeSlotPolicy.reset();
+        framePool.reset();
+    }
+
+    @Override
+    public BufferInfo getFrame(int frameIndex, BufferInfo info) {
+        info.reset(logicalFrameStartSizes.get(frameIndex));
+        return info;
+    }
+
+    @Override
+    public int getNumFrames() {
+        return logicalFrameStartSizes.size();
+    }
+
+    @Override
+    public int insertFrame(ByteBuffer frame) throws HyracksDataException {
+        int frameSize = frame.capacity();
+        int physicalFrameId = findAvailableFrame(frameSize);
+        if (physicalFrameId < 0) {
+            return -1;
+        }
+        PhysicalFrameOffset frameOffset = physicalFrameOffsets.get(physicalFrameId);
+        ByteBuffer buffer = frameOffset.physicalFrame;
+        int offset = frameOffset.physicalOffset;
+        System.arraycopy(frame.array(), 0, buffer.array(), offset, frameSize);
+        if (offset + frameSize < buffer.capacity()) {
+            freeSlotPolicy.pushNewFrame(physicalFrameId, buffer.capacity() - offset - frameSize);
+        }
+        frameOffset.physicalOffset = offset + frameSize;
+        logicalFrameStartSizes.add(new BufferInfo(buffer, offset, frameSize));
+        return logicalFrameStartSizes.size() - 1;
+    }
+
+    @Override
+    public void close() {
+        physicalFrameOffsets.clear();
+        logicalFrameStartSizes.clear();
+        freeSlotPolicy.reset();
+        framePool.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePool.java
new file mode 100644
index 0000000..f83eb8b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFramePool.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class VariableFramePool implements IFramePool {
+    public static final int UNLIMITED_MEMORY = -1;
+
+    private final IHyracksFrameMgrContext ctx;
+    private final int minFrameSize;
+    private final int memBudget;
+
+    private int allocateMem;
+    private ArrayList<ByteBuffer> buffers; // the unused slots were sorted by size increasingly.
+    private BitSet used; // the merged one also marked as used.
+
+    /**
+     * The constructor of the VariableFramePool.
+     *
+     * @param ctx
+     * @param memBudgetInBytes
+     *            the given memory budgets to allocate the frames. If it less than 0, it will be treated as unlimited budgets
+     */
+    public VariableFramePool(IHyracksFrameMgrContext ctx, int memBudgetInBytes) {
+        this.ctx = ctx;
+        this.minFrameSize = ctx.getInitialFrameSize();
+        this.allocateMem = 0;
+        if (memBudgetInBytes == UNLIMITED_MEMORY) {
+            this.memBudget = Integer.MAX_VALUE;
+            this.buffers = new ArrayList<>();
+            this.used = new BitSet();
+        } else {
+            this.memBudget = memBudgetInBytes;
+            this.buffers = new ArrayList<>(memBudgetInBytes / minFrameSize);
+            this.used = new BitSet(memBudgetInBytes / minFrameSize);
+        }
+    }
+
+    @Override
+    public int getMinFrameSize() {
+        return minFrameSize;
+    }
+
+    @Override
+    public int getMemoryBudgetBytes() {
+        return memBudget;
+    }
+
+    @Override
+    public ByteBuffer allocateFrame(int frameSize) throws HyracksDataException {
+        int frameId = findExistingFrame(frameSize);
+        if (frameId >= 0) {
+            return reuseFrame(frameId);
+        }
+        if (haveEnoughFreeSpace(frameSize)) {
+            return createNewFrame(frameSize);
+        }
+        return mergeExistingFrames(frameSize);
+
+    }
+
+    private boolean haveEnoughFreeSpace(int frameSize) {
+        return frameSize + allocateMem <= memBudget;
+    }
+
+    private static int getFirstUnusedPos(BitSet used) {
+        return used.nextClearBit(0);
+    }
+
+    private static int getLastUnusedPos(BitSet used, int lastPos) {
+        return used.previousClearBit(lastPos);
+    }
+
+    private static int binarySearchUnusedBuffer(ArrayList<ByteBuffer> buffers, BitSet used, int frameSize) {
+        int l = getFirstUnusedPos(used); // to skip the merged null buffers
+        int h = getLastUnusedPos(used, (buffers.size() - 1)) + 1; // to skip the newly created buffers
+        if (l >= h) {
+            return -1;
+        }
+        int highest = h;
+        int mid = (l + h) / 2;
+        while (l < h) {
+            ByteBuffer buffer = buffers.get(mid);
+            if (buffer.capacity() == frameSize) {
+                break;
+            }
+            if (buffer.capacity() < frameSize) {
+                l = mid + 1;
+            } else {
+                h = mid;
+            }
+            mid = (l + h) / 2;
+        }
+        mid = used.nextClearBit(mid);
+        return mid < highest ? mid : -1;
+    }
+
+    private int findExistingFrame(int frameSize) {
+        return binarySearchUnusedBuffer(buffers, used, frameSize);
+    }
+
+    private ByteBuffer reuseFrame(int id) {
+        used.set(id);
+        buffers.get(id).clear();
+        return buffers.get(id);
+    }
+
+    private ByteBuffer createNewFrame(int frameSize) throws HyracksDataException {
+        buffers.add(ctx.allocateFrame(frameSize));
+        allocateMem += frameSize;
+        return reuseFrame(buffers.size() - 1);
+    }
+
+    /**
+     * The merging sequence is from the smallest to the largest order.
+     * Once the buffer get merged, it will be remove from the list in order to free the object.
+     * And the index spot of it will be marked as used.
+     *
+     * @param frameSize
+     * @return
+     * @throws HyracksDataException
+     */
+    private ByteBuffer mergeExistingFrames(int frameSize) throws HyracksDataException {
+        int mergedSize = memBudget - allocateMem;
+        int highBound = getLastUnusedPos(used, buffers.size() - 1) + 1;
+        for (int i = getFirstUnusedPos(used); i < highBound; ++i) {
+            if (!used.get(i)) {
+                mergedSize += deAllocateFrame(i);
+                if (mergedSize >= frameSize) {
+                    return createNewFrame(mergedSize);
+                }
+            }
+        }
+        return null;
+    }
+
+    private int deAllocateFrame(int id) {
+        ByteBuffer frame = buffers.get(id);
+        ctx.deallocateFrames(frame.capacity());
+        buffers.set(id, null);
+        used.set(id);
+        allocateMem -= frame.capacity();
+        return frame.capacity();
+    }
+
+    @Override
+    public void reset() {
+        removeEmptySpot(buffers);
+        Collections.sort(buffers, sizeByteBufferComparator);
+        used.clear();
+    }
+
+    private static void removeEmptySpot(List<ByteBuffer> buffers) {
+        for (int i = 0; i < buffers.size();) {
+            if (buffers.get(i) == null) {
+                buffers.remove(i);
+            } else {
+                i++;
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        buffers.clear();
+        used.clear();
+        allocateMem = 0;
+    }
+
+    private static Comparator<ByteBuffer> sizeByteBufferComparator = new Comparator<ByteBuffer>() {
+        @Override
+        public int compare(ByteBuffer o1, ByteBuffer o2) {
+            if (o1.capacity() == o2.capacity()) {
+                return 0;
+            }
+            return o1.capacity() < o2.capacity() ? -1 : 1;
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
deleted file mode 100644
index 06c46b0..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractRunningAggregatorDescriptor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.group;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public abstract class AbstractRunningAggregatorDescriptor implements IAggregatorDescriptor {
-
-    /* (non-Javadoc)
-     * @see org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor#outputPartialResult(org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, org.apache.hyracks.api.comm.IFrameTupleAccessor, int, org.apache.hyracks.dataflow.std.group.AggregateState)
-     */
-    @Override
-    public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-            AggregateState state) throws HyracksDataException {
-        return false;
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor#outputFinalResult(org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder, org.apache.hyracks.api.comm.IFrameTupleAccessor, int, org.apache.hyracks.dataflow.std.group.AggregateState)
-     */
-    @Override
-    public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
-            AggregateState state) throws HyracksDataException {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AggregateType.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AggregateType.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AggregateType.java
new file mode 100644
index 0000000..830ab89
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AggregateType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.group;
+
+public enum AggregateType {
+    PARTIAL,
+    FINAL
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/6abc63e2/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/FrameToolsForGroupers.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/FrameToolsForGroupers.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/FrameToolsForGroupers.java
deleted file mode 100644
index 7416677..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/FrameToolsForGroupers.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.group;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.FrameHelper;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-/**
- *
- */
-public class FrameToolsForGroupers {
-
-    public static void writeFields(byte[] buf, int offset, int length, ArrayTupleBuilder tupleBuilder)
-            throws HyracksDataException {
-        writeFields(buf, offset, length, tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                tupleBuilder.getSize());
-    }
-
-    public static void writeFields(byte[] buf, int offset, int length, int[] fieldsOffset, byte[] data, int dataOffset,
-            int dataLength) throws HyracksDataException {
-        if (dataLength + 4 * fieldsOffset.length > length) {
-            throw new HyracksDataException("Out of buffer bound: try to write too much data (" + dataLength
-                    + ") to the given bound (" + length + ").");
-        }
-
-        ByteBuffer buffer = ByteBuffer.wrap(buf, offset, length);
-        for (int i = 0; i < fieldsOffset.length; i++) {
-            buffer.putInt(fieldsOffset[i]);
-        }
-        buffer.put(data, dataOffset, dataLength);
-    }
-
-    public static void updateFrameMetaForNewTuple(ByteBuffer buffer, int addedTupleLength) throws HyracksDataException {
-        int currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
-        int currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
-                * currentTupleCount);
-        int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
-
-        // update tuple end offset
-        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4 * (currentTupleCount + 1),
-                newTupleEndOffset);
-        // Update the tuple count
-        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()), currentTupleCount + 1);
-    }
-
-    public static void updateFrameMetaForNewTuple(ByteBuffer buffer, int addedTupleLength, boolean isReset)
-            throws HyracksDataException {
-        int currentTupleCount;
-        int currentTupleEndOffset;
-        if (isReset) {
-            currentTupleCount = 0;
-            currentTupleEndOffset = 0;
-        } else {
-            currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
-            currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
-                    * currentTupleCount);
-        }
-        int newTupleEndOffset = currentTupleEndOffset + addedTupleLength;
-
-        // update tuple end offset
-        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4 * (currentTupleCount + 1),
-                newTupleEndOffset);
-        // Update the tuple count
-        buffer.putInt(FrameHelper.getTupleCountOffset(buffer.capacity()), currentTupleCount + 1);
-    }
-
-    public static boolean isFrameOverflowing(ByteBuffer buffer, int length, boolean isReset)
-            throws HyracksDataException {
-
-        int currentTupleCount = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()));
-        if (currentTupleCount == 0 || isReset) {
-            return length + 4 + 4 > buffer.capacity();
-        }
-        int currentTupleEndOffset = buffer.getInt(FrameHelper.getTupleCountOffset(buffer.capacity()) - 4
-                * currentTupleCount);
-        return currentTupleEndOffset + length + 4 + (currentTupleCount + 1) * 4 > buffer.capacity();
-    }
-}