You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:22 UTC

[05/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
new file mode 100644
index 0000000..8f8518c
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.ITupleBufferAccessor;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.ITupleBufferManager;
+import edu.uci.ics.hyracks.dataflow.std.structures.IResetableComparable;
+import edu.uci.ics.hyracks.dataflow.std.structures.IResetableComparableFactory;
+import edu.uci.ics.hyracks.dataflow.std.structures.MaxHeap;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class TupleSorterHeapSort implements ITupleSorter {
+
+    private static final Logger LOGGER = Logger.getLogger(TupleSorterHeapSort.class.getName());
+
+    class HeapEntryFactory implements IResetableComparableFactory<HeapEntry> {
+        @Override
+        public IResetableComparable<HeapEntry> createResetableComparable() {
+            return new HeapEntry();
+        }
+    }
+
+    class HeapEntry implements IResetableComparable<HeapEntry> {
+        int nmk;
+        TuplePointer tuplePointer;
+
+        public HeapEntry() {
+            tuplePointer = new TuplePointer();
+            nmk = 0;
+        }
+
+        @Override
+        public int compareTo(HeapEntry o) {
+            if (nmk != o.nmk) {
+                return ((((long) nmk) & 0xffffffffL) < (((long) o.nmk) & 0xffffffffL)) ? -1 : 1;
+            }
+            bufferAccessor1.reset(tuplePointer);
+            bufferAccessor2.reset(o.tuplePointer);
+            byte[] b1 = bufferAccessor1.getTupleBuffer().array();
+            byte[] b2 = bufferAccessor2.getTupleBuffer().array();
+
+            for (int f = 0; f < comparators.length; ++f) {
+                int fIdx = sortFields[f];
+                int s1 = bufferAccessor1.getAbsFieldStartOffset(fIdx);
+                int l1 = bufferAccessor1.getFieldLength(fIdx);
+
+                int s2 = bufferAccessor2.getAbsFieldStartOffset(fIdx);
+                int l2 = bufferAccessor2.getFieldLength(fIdx);
+                int c;
+                try {
+                    c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                } catch (HyracksDataException e) {
+                    throw new IllegalStateException(e);
+                }
+                if (c != 0) {
+                    return c;
+                }
+            }
+            return 0;
+        }
+
+        public void reset(int nmkey) {
+            nmk = nmkey;
+        }
+
+        @Override
+        public void reset(HeapEntry other) {
+            nmk = other.nmk;
+            tuplePointer.reset(other.tuplePointer);
+        }
+    }
+
+    private final ITupleBufferManager bufferManager;
+    private final ITupleBufferAccessor bufferAccessor1;
+    private final ITupleBufferAccessor bufferAccessor2;
+    private final int topK;
+    private final FrameTupleAppender outputAppender;
+    private final IFrame outputFrame;
+    private final int[] sortFields;
+    private final INormalizedKeyComputer nkc;
+    private final IBinaryComparator[] comparators;
+
+    private HeapEntry maxEntry;
+    private HeapEntry newEntry;
+
+    private MaxHeap heap;
+    private boolean isSorted;
+
+    public TupleSorterHeapSort(IHyracksTaskContext ctx, ITupleBufferManager bufferManager, int topK, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories)
+            throws HyracksDataException {
+        this.bufferManager = bufferManager;
+        this.bufferAccessor1 = bufferManager.getTupleAccessor();
+        this.bufferAccessor2 = bufferManager.getTupleAccessor();
+        this.topK = topK;
+        this.outputFrame = new VSizeFrame(ctx);
+        this.outputAppender = new FrameTupleAppender();
+        this.sortFields = sortFields;
+        this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+        this.comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+
+        this.heap = new MaxHeap(new HeapEntryFactory(), topK);
+        this.maxEntry = new HeapEntry();
+        this.newEntry = new HeapEntry();
+        this.isSorted = false;
+    }
+
+    @Override
+    public int getTupleCount() {
+        return heap.getNumEntries();
+    }
+
+    @Override
+    public boolean insertTuple(IFrameTupleAccessor frameTupleAccessor, int index) throws HyracksDataException {
+        if (isSorted) {
+            throw new HyracksDataException(
+                    "The Heap haven't be reset after sorting, the order of using this class is not correct.");
+        }
+        int nmkey = getPNK(frameTupleAccessor, index);
+        if (heap.getNumEntries() >= topK) {
+            heap.peekMax(maxEntry);
+            if (compareTuple(frameTupleAccessor, index, nmkey, maxEntry) >= 0) {
+                return true;
+            }
+        }
+
+        newEntry.reset(nmkey);
+        if (!bufferManager.insertTuple(frameTupleAccessor, index, newEntry.tuplePointer)) {
+            return false;
+        }
+        if (heap.getNumEntries() < topK) {
+            heap.insert(newEntry);
+        } else {
+            bufferManager.deleteTuple(maxEntry.tuplePointer);
+            heap.replaceMax(newEntry);
+        }
+        return true;
+    }
+
+    private int getPNK(IFrameTupleAccessor fta, int tIx) {
+        if (nkc == null) {
+            return 0;
+        }
+        int sfIdx = sortFields[0];
+        return nkc.normalize(fta.getBuffer().array(), fta.getAbsoluteFieldStartOffset(tIx, sfIdx),
+                fta.getFieldLength(tIx, sfIdx));
+    }
+
+    private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int nmkey, HeapEntry maxEntry)
+            throws HyracksDataException {
+        if (nmkey != maxEntry.nmk) {
+            return ((((long) nmkey) & 0xffffffffL) < (((long) maxEntry.nmk) & 0xffffffffL)) ? -1 : 1;
+        }
+        bufferAccessor2.reset(maxEntry.tuplePointer);
+        byte[] b1 = frameTupleAccessor.getBuffer().array();
+        byte[] b2 = bufferAccessor2.getTupleBuffer().array();
+
+        for (int f = 0; f < comparators.length; ++f) {
+            int fIdx = sortFields[f];
+            int s1 = frameTupleAccessor.getAbsoluteFieldStartOffset(tid, fIdx);
+            int l1 = frameTupleAccessor.getFieldLength(tid, fIdx);
+
+            int s2 = bufferAccessor2.getAbsFieldStartOffset(fIdx);
+            int l2 = bufferAccessor2.getFieldLength(fIdx);
+            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+    @Override
+    public boolean hasRemaining() {
+        return getTupleCount() > 0;
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        bufferManager.reset();
+        heap.reset();
+        isSorted = false;
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public void sort() throws HyracksDataException {
+        IResetableComparable[] entries = heap.getEntries();
+        int count = heap.getNumEntries();
+        Arrays.sort(entries, 0, count, entryComparator);
+        isSorted = true;
+    }
+
+    private static final Comparator<IResetableComparable> entryComparator = new Comparator<IResetableComparable>() {
+        @Override
+        public int compare(IResetableComparable o1, IResetableComparable o2) {
+            return o1.compareTo(o2);
+        }
+    };
+
+    @Override
+    public void close() {
+        heap = null;
+        bufferManager.close();
+        isSorted = false;
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public int flush(IFrameWriter writer) throws HyracksDataException {
+        outputAppender.reset(outputFrame, true);
+        int maxFrameSize = outputFrame.getFrameSize();
+        int numEntries = heap.getNumEntries();
+        IResetableComparable[] entries = heap.getEntries();
+        int io = 0;
+        for (int i = 0; i < numEntries; i++) {
+            HeapEntry minEntry = (HeapEntry) entries[i];
+            bufferAccessor1.reset(minEntry.tuplePointer);
+            int flushed = FrameUtils
+                    .appendToWriter(writer, outputAppender, bufferAccessor1.getTupleBuffer().array(),
+                            bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength());
+            if (flushed > 0) {
+                maxFrameSize = Math.max(maxFrameSize, flushed);
+                io++;
+            }
+        }
+        maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
+        outputAppender.flush(writer, true);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(
+                    "Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");
+        }
+        return maxFrameSize;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/EnumFreeSlotPolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/EnumFreeSlotPolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/EnumFreeSlotPolicy.java
new file mode 100644
index 0000000..26da494
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/EnumFreeSlotPolicy.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+public enum EnumFreeSlotPolicy {
+    SMALLEST_FIT,
+    LAST_FIT,
+    BIGGEST_FIT,
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirst.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirst.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirst.java
new file mode 100644
index 0000000..085a1e8
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirst.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import edu.uci.ics.hyracks.dataflow.std.structures.IResetableComparable;
+import edu.uci.ics.hyracks.dataflow.std.structures.IResetableComparableFactory;
+import edu.uci.ics.hyracks.dataflow.std.structures.MaxHeap;
+
+public class FrameFreeSlotBiggestFirst implements IFrameFreeSlotPolicy {
+    private static final int INVALID = -1;
+
+    class SpaceEntryFactory implements IResetableComparableFactory {
+        @Override
+        public IResetableComparable createResetableComparable() {
+            return new SpaceEntry();
+        }
+    }
+
+    class SpaceEntry implements IResetableComparable<SpaceEntry> {
+        int space;
+        int id;
+
+        SpaceEntry() {
+            space = INVALID;
+            id = INVALID;
+        }
+
+        @Override
+        public int compareTo(SpaceEntry o) {
+            if (o.space != space) {
+                if (o.space == INVALID) {
+                    return 1;
+                }
+                if (space == INVALID) {
+                    return -1;
+                }
+                return space < o.space ? -1 : 1;
+            }
+            return 0;
+        }
+
+        @Override
+        public void reset(SpaceEntry other) {
+            space = other.space;
+            id = other.id;
+        }
+
+        void reset(int space, int id) {
+            this.space = space;
+            this.id = id;
+        }
+    }
+
+    private MaxHeap heap;
+    private SpaceEntry tempEntry;
+
+    public FrameFreeSlotBiggestFirst(int initialCapacity) {
+        heap = new MaxHeap(new SpaceEntryFactory(), initialCapacity);
+        tempEntry = new SpaceEntry();
+    }
+
+    @Override
+    public int popBestFit(int tobeInsertedSize) {
+        if (!heap.isEmpty()) {
+            heap.peekMax(tempEntry);
+            if (tempEntry.space >= tobeInsertedSize) {
+                heap.getMax(tempEntry);
+                return tempEntry.id;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public void pushNewFrame(int frameID, int freeSpace) {
+        tempEntry.reset(freeSpace, frameID);
+        heap.insert(tempEntry);
+    }
+
+    @Override
+    public void reset() {
+        heap.reset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFit.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFit.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFit.java
new file mode 100644
index 0000000..0bfcf38
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFit.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.util.Arrays;
+
+public class FrameFreeSlotLastFit implements IFrameFreeSlotPolicy {
+    private static int INITIAL_CAPACITY = 10;
+
+    private class FrameSpace {
+        int frameId;
+        int freeSpace;
+
+        FrameSpace(int frameId, int freeSpace) {
+            reset(frameId, freeSpace);
+        }
+
+        void reset(int frameId, int freeSpace) {
+            this.frameId = frameId;
+            this.freeSpace = freeSpace;
+        }
+    }
+
+    private FrameSpace[] frameSpaces;
+    private int size;
+
+    public FrameFreeSlotLastFit(int maxFrames) {
+        frameSpaces = new FrameSpace[maxFrames];
+        size = 0;
+    }
+
+    public FrameFreeSlotLastFit() {
+        this(INITIAL_CAPACITY);
+    }
+
+    @Override
+    public int popBestFit(int tobeInsertedSize) {
+        for (int i = size - 1; i >= 0; i--) {
+            if (frameSpaces[i].freeSpace >= tobeInsertedSize) {
+                FrameSpace ret = frameSpaces[i];
+                System.arraycopy(frameSpaces, i + 1, frameSpaces, i, size - i - 1);
+                frameSpaces[--size] = ret;
+                return ret.frameId;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public void pushNewFrame(int frameID, int freeSpace) {
+        if (size >= frameSpaces.length) {
+            frameSpaces = Arrays.copyOf(frameSpaces, size * 2);
+        }
+        if (frameSpaces[size] == null) {
+            frameSpaces[size++] = new FrameSpace(frameID, freeSpace);
+        } else {
+            frameSpaces[size++].reset(frameID, freeSpace);
+        }
+    }
+
+    @Override
+    public void reset() {
+        size = 0;
+        for (int i = frameSpaces.length - 1; i >= 0; i--) {
+            frameSpaces[i] = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotSmallestFit.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotSmallestFit.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotSmallestFit.java
new file mode 100644
index 0000000..69e1911
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotSmallestFit.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class FrameFreeSlotSmallestFit implements IFrameFreeSlotPolicy {
+
+    private TreeMap<Integer, LinkedList<Integer>> freeSpaceIndex;
+
+    public FrameFreeSlotSmallestFit() {
+        freeSpaceIndex = new TreeMap<>();
+    }
+
+    @Override
+    public int popBestFit(int tobeInsertedSize) {
+        Map.Entry<Integer, LinkedList<Integer>> entry = freeSpaceIndex.ceilingEntry(tobeInsertedSize);
+        if (entry == null) {
+            return -1;
+        }
+        int id = entry.getValue().removeFirst();
+        if (entry.getValue().isEmpty()) {
+            freeSpaceIndex.remove(entry.getKey());
+        }
+        return id;
+    }
+
+    @Override
+    public void pushNewFrame(int frameID, int freeSpace) {
+        Map.Entry<Integer, LinkedList<Integer>> entry = freeSpaceIndex.ceilingEntry(freeSpace);
+        if (entry == null || entry.getKey() != freeSpace) {
+            LinkedList<Integer> linkedList = new LinkedList<>();
+            linkedList.add(frameID);
+            freeSpaceIndex.put(freeSpace, linkedList);
+        } else {
+            entry.getValue().add(frameID);
+        }
+    }
+
+    @Override
+    public void reset() {
+        freeSpaceIndex.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
new file mode 100644
index 0000000..9a52efa
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameBufferManager {
+
+    /**
+     * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
+     *
+     * @throws edu.uci.ics.hyracks.api.exceptions.HyracksDataException
+     */
+    void reset() throws HyracksDataException;
+
+    /**
+     * @param frameIndex
+     * @return the specified frame, from the set of memory buffers, being
+     * managed by this memory manager
+     */
+    ByteBuffer getFrame(int frameIndex);
+
+    /**
+     * Get the startOffset of the specific frame inside buffer
+     *
+     * @param frameIndex
+     * @return the start offset of the frame returned by {@link #getFrame(int)} method.
+     */
+    int getFrameStartOffset(int frameIndex);
+
+    /**
+     * Get the size of the specific frame inside buffer
+     *
+     * @param frameIndex
+     * @return the length of the specific frame
+     */
+    int getFrameSize(int frameIndex);
+
+    /**
+     * @return the number of frames in this buffer
+     */
+    int getNumFrames();
+
+    /**
+     * Writes the whole frame into the buffer.
+     *
+     * @param frame source frame
+     * @return the id of the inserted frame. if failed to return it will be -1.
+     */
+    int insertFrame(ByteBuffer frame) throws HyracksDataException;
+
+    void close();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
new file mode 100644
index 0000000..57a8094
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+public interface IFrameFreeSlotPolicy {
+
+    /**
+     * Find the best fit frame id which can hold the data, and then pop it out from the index.
+     * Return -1 is failed to find any.
+     *
+     * @param tobeInsertedSize the actual size of the data which should include
+     *                         the meta data like the field offset and the tuple
+     *                         count extra size
+     * @return the best fit frame id
+     */
+    int popBestFit(int tobeInsertedSize);
+
+    /**
+     * Register the new free slot into the index
+     *
+     * @param frameID
+     * @param freeSpace
+     */
+    void pushNewFrame(int frameID, int freeSpace);
+
+    /**
+     * Clear all the existing free slot information.
+     */
+    void reset();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
new file mode 100644
index 0000000..1e5be25
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFramePool {
+
+    int getMinFrameSize();
+
+    int getMemoryBudgetBytes();
+
+    /**
+     * Get a frame of given size. <br>
+     * Returns {@code null} if failed to allocate the required size of frame
+     *
+     * @param frameSize the actual size of the frame.
+     * @return the allocated frame
+     * @throws HyracksDataException
+     */
+    ByteBuffer allocateFrame(int frameSize) throws HyracksDataException;
+
+    /**
+     * Reset the counters to initial status. This method should not release the pre-allocated resources.
+     */
+    void reset();
+
+    /**
+     * Release the pre-allocated resources.
+     */
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
new file mode 100644
index 0000000..49a664c
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public interface ITupleBufferAccessor {
+
+    void reset(TuplePointer tuplePointer);
+
+    ByteBuffer getTupleBuffer();
+
+    int getTupleStartOffset();
+
+    int getTupleLength();
+
+    int getAbsFieldStartOffset(int fieldId);
+
+    int getFieldLength(int fieldId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
new file mode 100644
index 0000000..6f94563
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public interface ITupleBufferManager {
+    /**
+     * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
+     *
+     * @throws edu.uci.ics.hyracks.api.exceptions.HyracksDataException
+     */
+    void reset() throws HyracksDataException;
+
+    /**
+     * @return the number of tuples in this buffer
+     */
+    int getNumTuples();
+
+    boolean insertTuple(IFrameTupleAccessor accessor, int idx, TuplePointer tuplePointer) throws HyracksDataException;
+
+    void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException;
+
+    void close();
+
+    ITupleBufferAccessor getTupleAccessor();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
new file mode 100644
index 0000000..834ba2a
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.FixedSizeFrame;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class VariableFrameMemoryManager implements IFrameBufferManager {
+
+    private class PhysicalFrameOffset {
+        IFrame physicalFrame;
+        int physicalOffset;
+
+        PhysicalFrameOffset(IFrame frame, int offset) {
+            physicalFrame = frame;
+            physicalOffset = offset;
+        }
+    }
+
+    private class LogicalFrameStartSize {
+        ByteBuffer logicalFrame;
+        int logicalStart;
+        int logicalSize;
+
+        LogicalFrameStartSize(ByteBuffer frame, int start, int size) {
+            logicalFrame = frame;
+            logicalStart = start;
+            logicalSize = size;
+        }
+    }
+
+    private final IFramePool framePool;
+    private List<PhysicalFrameOffset> physicalFrameOffsets;
+    private List<LogicalFrameStartSize> logicalFrameStartSizes;
+    private final IFrameFreeSlotPolicy freeSlotPolicy;
+
+    public VariableFrameMemoryManager(IFramePool framePool, IFrameFreeSlotPolicy freeSlotPolicy) {
+        this.framePool = framePool;
+        this.freeSlotPolicy = freeSlotPolicy;
+        int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize();
+        this.physicalFrameOffsets = new ArrayList<>(maxFrames);
+        this.logicalFrameStartSizes = new ArrayList<>(maxFrames);
+    }
+
+    private int findAvailableFrame(int frameSize) throws HyracksDataException {
+        int frameId = freeSlotPolicy.popBestFit(frameSize);
+        if (frameId >= 0) {
+            return frameId;
+        }
+        ByteBuffer buffer = framePool.allocateFrame(frameSize);
+        if (buffer != null) {
+            IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), 0);
+            physicalFrameOffsets.add(new PhysicalFrameOffset(new FixedSizeFrame(buffer), 0));
+            return physicalFrameOffsets.size() - 1;
+        }
+        return -1;
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        physicalFrameOffsets.clear();
+        logicalFrameStartSizes.clear();
+        freeSlotPolicy.reset();
+        framePool.reset();
+    }
+
+    @Override
+    public ByteBuffer getFrame(int frameIndex) {
+        return logicalFrameStartSizes.get(frameIndex).logicalFrame;
+    }
+
+    @Override
+    public int getFrameStartOffset(int frameIndex) {
+        return logicalFrameStartSizes.get(frameIndex).logicalStart;
+    }
+
+    @Override
+    public int getFrameSize(int frameIndex) {
+        return logicalFrameStartSizes.get(frameIndex).logicalSize;
+    }
+
+    @Override
+    public int getNumFrames() {
+        return logicalFrameStartSizes.size();
+    }
+
+    @Override
+    public int insertFrame(ByteBuffer frame) throws HyracksDataException {
+        int frameSize = frame.capacity();
+        int physicalFrameId = findAvailableFrame(frameSize);
+        if (physicalFrameId < 0) {
+            return -1;
+        }
+        ByteBuffer buffer = physicalFrameOffsets.get(physicalFrameId).physicalFrame.getBuffer();
+        int offset = physicalFrameOffsets.get(physicalFrameId).physicalOffset;
+        System.arraycopy(frame.array(), 0, buffer.array(), offset, frameSize);
+        if (offset + frameSize < buffer.capacity()) {
+            freeSlotPolicy.pushNewFrame(physicalFrameId, buffer.capacity() - offset - frameSize);
+        }
+        physicalFrameOffsets.get(physicalFrameId).physicalOffset = offset + frameSize;
+        logicalFrameStartSizes.add(new LogicalFrameStartSize(buffer, offset, frameSize));
+        return logicalFrameStartSizes.size() - 1;
+    }
+
+    @Override
+    public void close() {
+        physicalFrameOffsets.clear();
+        logicalFrameStartSizes.clear();
+        freeSlotPolicy.reset();
+        framePool.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
new file mode 100644
index 0000000..0d936b4
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.context.IHyracksFrameMgrContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class VariableFramePool implements IFramePool {
+    public static final int UNLIMITED_MEMORY = -1;
+
+    private final IHyracksFrameMgrContext ctx;
+    private final int minFrameSize;
+    private final int memBudget;
+
+    private int allocateMem;
+    private ArrayList<ByteBuffer> buffers;  // the unused slots were sorted by size increasingly.
+    private BitSet used; // the merged one also marked as used.
+
+    /**
+     * The constructor of the VariableFramePool.
+     *
+     * @param ctx
+     * @param memBudgetInBytes the given memory budgets to allocate the frames. If it less than 0, it will be treated as unlimited budgets
+     */
+    public VariableFramePool(IHyracksFrameMgrContext ctx, int memBudgetInBytes) {
+        this.ctx = ctx;
+        this.minFrameSize = ctx.getInitialFrameSize();
+        this.allocateMem = 0;
+        if (memBudgetInBytes == UNLIMITED_MEMORY) {
+            this.memBudget = Integer.MAX_VALUE;
+            this.buffers = new ArrayList<>();
+            this.used = new BitSet();
+        } else {
+            this.memBudget = memBudgetInBytes;
+            this.buffers = new ArrayList<>(memBudgetInBytes / minFrameSize);
+            this.used = new BitSet(memBudgetInBytes / minFrameSize);
+        }
+    }
+
+    @Override
+    public int getMinFrameSize() {
+        return minFrameSize;
+    }
+
+    @Override
+    public int getMemoryBudgetBytes() {
+        return memBudget;
+    }
+
+    @Override
+    public ByteBuffer allocateFrame(int frameSize) throws HyracksDataException {
+        int frameId = findExistingFrame(frameSize);
+        if (frameId >= 0) {
+            return reuseFrame(frameId);
+        }
+        if (haveEnoughFreeSpace(frameSize)) {
+            return createNewFrame(frameSize);
+        }
+        return mergeExistingFrames(frameSize);
+
+    }
+
+    private boolean haveEnoughFreeSpace(int frameSize) {
+        return frameSize + allocateMem <= memBudget;
+    }
+
+    private static int getFirstUnUsedPos(BitSet used) {
+        return used.nextClearBit(0);
+    }
+
+    private static int getLastUnUsedPos(BitSet used, int lastPos) {
+        return used.previousClearBit(lastPos);
+    }
+
+    private static int binarySearchUnusedBuffer(ArrayList<ByteBuffer> buffers, BitSet used, int frameSize) {
+        int l = getFirstUnUsedPos(used); // to skip the merged null buffers
+        int h = getLastUnUsedPos(used, (buffers.size() - 1)) + 1; // to skip the newly created buffers
+        if (l >= h) {
+            return -1;
+        }
+        int highest = h;
+        int mid = (l + h) / 2;
+        while (l < h) {
+            ByteBuffer buffer = buffers.get(mid);
+            if (buffer.capacity() == frameSize) {
+                break;
+            }
+            if (buffer.capacity() < frameSize) {
+                l = mid + 1;
+            } else {
+                h = mid;
+            }
+            mid = (l + h) / 2;
+        }
+        mid = used.nextClearBit(mid);
+        return mid < highest ? mid : -1;
+    }
+
+    private int findExistingFrame(int frameSize) {
+        return binarySearchUnusedBuffer(buffers, used, frameSize);
+    }
+
+    private ByteBuffer reuseFrame(int id) {
+        used.set(id);
+        buffers.get(id).clear();
+        return buffers.get(id);
+    }
+
+    private ByteBuffer createNewFrame(int frameSize) throws HyracksDataException {
+        buffers.add(ctx.allocateFrame(frameSize));
+        allocateMem += frameSize;
+        return reuseFrame(buffers.size() - 1);
+    }
+
+    /**
+     * The merging sequence is from the smallest to the largest order.
+     * Once the buffer get merged, it will be remove from the list in order to free the object.
+     * And the index spot of it will be marked as used.
+     *
+     * @param frameSize
+     * @return
+     * @throws HyracksDataException
+     */
+    private ByteBuffer mergeExistingFrames(int frameSize) throws HyracksDataException {
+        int mergedSize = memBudget - allocateMem;
+        int highBound = getLastUnUsedPos(used, buffers.size() - 1) + 1;
+        for (int i = getFirstUnUsedPos(used); i < highBound; ++i) {
+            if (!used.get(i)) {
+                mergedSize += deAllocateFrame(i);
+                if (mergedSize >= frameSize) {
+                    return createNewFrame(mergedSize);
+                }
+            }
+        }
+        return null;
+    }
+
+    private int deAllocateFrame(int id) {
+        ByteBuffer frame = buffers.get(id);
+        ctx.deallocateFrames(frame.capacity());
+        buffers.set(id, null);
+        used.set(id);
+        allocateMem -= frame.capacity();
+        return frame.capacity();
+    }
+
+    @Override
+    public void reset() {
+        removeEmptySpot(buffers);
+        Collections.sort(buffers, sizeByteBufferComparator);
+        used.clear();
+    }
+
+    private static void removeEmptySpot(List<ByteBuffer> buffers) {
+        for (int i = 0; i < buffers.size(); ) {
+            if (buffers.get(i) == null) {
+                buffers.remove(i);
+            } else {
+                i++;
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        buffers.clear();
+        used.clear();
+        allocateMem = 0;
+    }
+
+    private static Comparator<ByteBuffer> sizeByteBufferComparator = new Comparator<ByteBuffer>() {
+        @Override
+        public int compare(ByteBuffer o1, ByteBuffer o2) {
+            if (o1.capacity() == o2.capacity()) {
+                return 0;
+            }
+            return o1.capacity() < o2.capacity() ? -1 : 1;
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
new file mode 100644
index 0000000..0b077c2
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.IAppendDeletableFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class VariableTupleMemoryManager implements ITupleBufferManager {
+
+    private final static Logger LOG = Logger.getLogger(VariableTupleMemoryManager.class.getName());
+
+    private final int MIN_FREE_SPACE;
+    private final IFramePool pool;
+    private final IFrameFreeSlotPolicy policy;
+    private final IAppendDeletableFrameTupleAccessor accessor;
+    private final ArrayList<ByteBuffer> frames;
+    private final RecordDescriptor recordDescriptor;
+    private int numTuples;
+    private int statsReOrg;
+
+    public VariableTupleMemoryManager(IFramePool framePool, RecordDescriptor recordDescriptor) {
+        this.pool = framePool;
+        int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize();
+        this.policy = new FrameFreeSlotLastFit(maxFrames);
+        this.accessor = new DeletableFrameTupleAppender(recordDescriptor);
+        this.frames = new ArrayList<>();
+        this.MIN_FREE_SPACE = calculateMinFreeSpace(recordDescriptor);
+        this.recordDescriptor = recordDescriptor;
+        this.numTuples = 0;
+        this.statsReOrg = 0;
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        pool.reset();
+        policy.reset();
+        frames.clear();
+        numTuples = 0;
+    }
+
+    @Override
+    public int getNumTuples() {
+        return numTuples;
+    }
+
+    @Override
+    public boolean insertTuple(IFrameTupleAccessor fta, int idx, TuplePointer tuplePointer)
+            throws HyracksDataException {
+        int requiredFreeSpace = calculatePhysicalSpace(fta, idx);
+        int frameId = findAvailableFrame(requiredFreeSpace);
+        if (frameId < 0) {
+            if (canBeInsertedAfterCleanUpFragmentation(requiredFreeSpace)) {
+                reOrganizeFrames();
+                frameId = findAvailableFrame(requiredFreeSpace);
+                statsReOrg++;
+            } else {
+                return false;
+            }
+        }
+        assert frameId >= 0;
+        accessor.reset(frames.get(frameId));
+        assert accessor.getContiguousFreeSpace() >= requiredFreeSpace;
+        int tid = accessor.append(fta, idx);
+        assert tid >= 0;
+        tuplePointer.reset(frameId, tid);
+        if (accessor.getContiguousFreeSpace() > MIN_FREE_SPACE) {
+            policy.pushNewFrame(frameId, accessor.getContiguousFreeSpace());
+        }
+        numTuples++;
+        return true;
+    }
+
+    private void reOrganizeFrames() {
+        policy.reset();
+        for (int i = 0; i < frames.size(); i++) {
+            accessor.reset(frames.get(i));
+            accessor.reOrganizeBuffer();
+            policy.pushNewFrame(i, accessor.getContiguousFreeSpace());
+        }
+    }
+
+    private boolean canBeInsertedAfterCleanUpFragmentation(int requiredFreeSpace) {
+        for (int i = 0; i < frames.size(); i++) {
+            accessor.reset(frames.get(i));
+            if (accessor.getTotalFreeSpace() >= requiredFreeSpace) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private int findAvailableFrame(int requiredFreeSpace) throws HyracksDataException {
+        int frameId = policy.popBestFit(requiredFreeSpace);
+        if (frameId >= 0) {
+            return frameId;
+        }
+
+        int frameSize = calculateMinFrameSizeToPlaceTuple(requiredFreeSpace, pool.getMinFrameSize());
+        ByteBuffer buffer = pool.allocateFrame(frameSize);
+        if (buffer != null) {
+            accessor.clear(buffer);
+            frames.add(buffer);
+            return frames.size() - 1;
+        }
+        return -1;
+    }
+
+    private static int calculateMinFrameSizeToPlaceTuple(int requiredFreeSpace, int minFrameSize) {
+        return (1 + (requiredFreeSpace + 4 - 1) / minFrameSize) * minFrameSize;
+    }
+
+    private static int calculatePhysicalSpace(IFrameTupleAccessor fta, int idx) {
+        // 4 bytes to store the offset
+        return 4 + fta.getTupleLength(idx);
+    }
+
+    private static int calculateMinFreeSpace(RecordDescriptor recordDescriptor) {
+        // + 4 for the tuple offset
+        return recordDescriptor.getFieldCount() * 4 + 4;
+    }
+
+    @Override
+    public void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException {
+        accessor.reset(frames.get(tuplePointer.frameIndex));
+        accessor.delete(tuplePointer.tupleIndex);
+        numTuples--;
+    }
+
+    @Override
+    public void close() {
+        pool.close();
+        policy.reset();
+        frames.clear();
+        numTuples = 0;
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("VariableTupleMemoryManager has reorganized " + statsReOrg + " times");
+        }
+        statsReOrg = 0;
+    }
+
+    @Override
+    public ITupleBufferAccessor getTupleAccessor() {
+        return new ITupleBufferAccessor() {
+            private IAppendDeletableFrameTupleAccessor bufferAccessor = new DeletableFrameTupleAppender(
+                    recordDescriptor);
+            private int tid;
+
+            @Override
+            public void reset(TuplePointer tuplePointer) {
+                bufferAccessor.reset(frames.get(tuplePointer.frameIndex));
+                tid = tuplePointer.tupleIndex;
+            }
+
+            @Override
+            public ByteBuffer getTupleBuffer() {
+                return bufferAccessor.getBuffer();
+            }
+
+            @Override
+            public int getTupleStartOffset() {
+                return bufferAccessor.getTupleStartOffset(tid);
+            }
+
+            @Override
+            public int getTupleLength() {
+                return bufferAccessor.getTupleLength(tid);
+            }
+
+            @Override
+            public int getAbsFieldStartOffset(int fieldId) {
+                return bufferAccessor.getAbsoluteFieldStartOffset(tid, fieldId);
+            }
+
+            @Override
+            public int getFieldLength(int fieldId) {
+                return bufferAccessor.getFieldLength(tid, fieldId);
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
new file mode 100644
index 0000000..9b03a77
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.util;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
+
+/**
+ * This is a special frame which is used in TupleMemoryBuffer.
+ * This frame has a special structure to organize the deleted spaces.
+ * Specifically, the endOffset of the deleted tuple will be set as negative number.
+ * And we add a special <code>deleted_space</code> field at the last 4 bytes to remember how many bytes has been deleted.
+ */
+public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAccessor {
+
+    private static final int SIZE_DELETED_SPACE = 4;
+    private final RecordDescriptor recordDescriptor;
+    private ByteBuffer buffer;
+    private int tupleCountOffset;
+    private int tupleCount;
+    private int freeDataEndOffset;
+    private int deletedSpace;
+    private byte[] array;   // to speed up the array visit a little
+
+    public DeletableFrameTupleAppender(RecordDescriptor recordDescriptor) {
+        this.recordDescriptor = recordDescriptor;
+    }
+
+    private int getTupleCountOffset() {
+        return FrameHelper.getTupleCountOffset(buffer.capacity()) - SIZE_DELETED_SPACE;
+    }
+
+    private int getFreeDataEndOffset() {
+        return tupleCount == 0 ? 0 : Math.abs(IntSerDeUtils.getInt(array, tupleCountOffset - tupleCount * 4));
+    }
+
+    private void setFreeDataEndOffset(int offset) {
+        assert (offset >= 0);
+        IntSerDeUtils.putInt(array, tupleCountOffset - tupleCount * 4, offset);
+    }
+
+    private void setTupleCount(int count) {
+        IntSerDeUtils.putInt(array, tupleCountOffset, count);
+    }
+
+    private void setDeleteSpace(int count) {
+        IntSerDeUtils.putInt(array, buffer.capacity() - SIZE_DELETED_SPACE, count);
+    }
+
+    private int getPhysicalTupleCount() {
+        return IntSerDeUtils.getInt(array, tupleCountOffset);
+    }
+
+    private int getDeletedSpace() {
+        return IntSerDeUtils.getInt(array, buffer.capacity() - SIZE_DELETED_SPACE);
+    }
+
+    @Override
+    public void clear(ByteBuffer buffer) throws HyracksDataException {
+        this.buffer = buffer;
+        this.array = buffer.array();
+        tupleCountOffset = getTupleCountOffset();
+        setTupleCount(0);
+        setDeleteSpace(0);
+        resetCounts();
+    }
+
+    @Override
+    public void reset(ByteBuffer buffer) {
+        this.buffer = buffer;
+        this.array = buffer.array();
+        tupleCountOffset = getTupleCountOffset();
+        resetCounts();
+    }
+
+    private void resetCounts() {
+        deletedSpace = getDeletedSpace();
+        tupleCount = getPhysicalTupleCount();
+        freeDataEndOffset = getFreeDataEndOffset();
+    }
+
+    /**
+     * Append the record into the frame. This method will not validate the space, please make sure space is enough
+     * by calling {@link #getContiguousFreeSpace()}
+     *
+     * @param tupleAccessor
+     * @param tIndex
+     * @return
+     * @throws HyracksDataException
+     */
+    @Override
+    public int append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
+        byte[] src = tupleAccessor.getBuffer().array();
+        int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
+        int length = tupleAccessor.getTupleLength(tIndex);
+        System.arraycopy(src, tStartOffset, array, freeDataEndOffset, length);
+        setTupleCount(++tupleCount);
+        freeDataEndOffset += length;
+        setFreeDataEndOffset(freeDataEndOffset);
+        return tupleCount - 1;
+    }
+
+    @Override
+    public void delete(int tupleIndex) {
+        int endOffset = getTupleEndOffset(tupleIndex);
+        if (endOffset > 0) {
+            setTupleEndOffset(tupleIndex, -endOffset);
+            deletedSpace += endOffset - getTupleStartOffset(tupleIndex);
+            setDeleteSpace(deletedSpace);
+        }
+    }
+
+    @Override
+    public void reOrganizeBuffer() {
+        if (deletedSpace <= 0) {
+            return;
+        }
+        reclaimDeletedEnding();
+
+        freeDataEndOffset = 0;
+        int endOffset = 0;
+        for (int i = 0; i < tupleCount; i++) {
+            int startOffset = Math.abs(endOffset);
+            endOffset = getTupleEndOffset(i);
+            if (endOffset >= 0) {
+                int length = endOffset - startOffset;
+                assert ( length >= 0);
+                if (freeDataEndOffset != startOffset) {
+                    System.arraycopy(array, startOffset, array, freeDataEndOffset, length);
+                }
+                freeDataEndOffset += length;
+            }
+            setTupleEndOffset(i, freeDataEndOffset);
+        }
+        setFreeDataEndOffset(freeDataEndOffset);
+        deletedSpace = 0;
+        setDeleteSpace(0);
+    }
+
+    private void reclaimDeletedEnding() {
+        for (int i = tupleCount - 1; i >= 0; i--) {
+            int endOffset = getTupleEndOffset(i);
+            if (endOffset < 0) {
+                tupleCount--;
+            } else {
+                break;
+            }
+        }
+        setTupleCount(tupleCount);
+    }
+
+    @Override
+    public int getTotalFreeSpace() {
+        return getContiguousFreeSpace() + deletedSpace;
+    }
+
+    @Override
+    public int getContiguousFreeSpace() {
+        return getTupleCountOffset() - tupleCount * 4 - freeDataEndOffset;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return recordDescriptor.getFieldCount();
+    }
+
+    @Override
+    public int getFieldSlotsLength() {
+        return recordDescriptor.getFieldCount() * 4;
+    }
+
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        return IntSerDeUtils.getInt(array, getTupleStartOffset(tupleIndex) + fIdx * 4);
+    }
+
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        return fIdx == 0 ? 0 : IntSerDeUtils.getInt(array, getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+    }
+
+    @Override
+    public int getFieldLength(int tupleIndex, int fIdx) {
+        return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getTupleLength(int tupleIndex) {
+        int endOffset = getTupleEndOffset(tupleIndex);
+        if (endOffset < 0) {
+            return endOffset + getTupleStartOffset(tupleIndex);
+        }
+        return endOffset - getTupleStartOffset(tupleIndex);
+    }
+
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        return IntSerDeUtils.getInt(array, tupleCountOffset - 4 * (tupleIndex + 1));
+    }
+
+    private void setTupleEndOffset(int tupleIndex, int offset) {
+        IntSerDeUtils.putInt(array, tupleCountOffset - 4 * (tupleIndex + 1), offset);
+    }
+
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        int offset = tupleIndex == 0 ? 0 : IntSerDeUtils.getInt(array, tupleCountOffset - 4 * tupleIndex);
+        return Math.abs(offset);
+    }
+
+    @Override
+    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+        return getTupleStartOffset(tupleIndex) + getFieldSlotsLength() + getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupFrameAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupFrameAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupFrameAccessor.java
new file mode 100644
index 0000000..12ba72f
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupFrameAccessor.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * This {@code GroupFrameAccessor} access a group of logical frames which are stored in one physical
+ * continuous ByteBuffer. It is used in a RunFileReader which can read several frames at once, and we
+ * can use this accessor to parse the returned data as one frame. In the caller's view there is only
+ * one frame which simply the caller's work.
+ */
+public class GroupFrameAccessor implements IFrameTupleAccessor {
+
+    private class InnerFrameInfo implements Comparable<Integer> {
+        int start;
+        int length;
+        int tupleCount;
+
+        InnerFrameInfo(int start, int length, int tupleCount) {
+            this.start = start;
+            this.length = length;
+            this.tupleCount = tupleCount;
+        }
+
+        @Override
+        public int compareTo(Integer o) {
+            return -o.compareTo(tupleCount);
+        }
+    }
+
+    private final RecordDescriptor recordDescriptor;
+    private final int minFrameSize;
+    private final FrameTupleAccessor frameTupleAccessor;
+    private int lastTupleIndex;
+    private int lastFrameId;
+    private ByteBuffer buffer;
+    private List<InnerFrameInfo> innerFrameInfos;
+
+    public GroupFrameAccessor(int minFrameSize, RecordDescriptor recordDescriptor) {
+        this.minFrameSize = minFrameSize;
+        this.recordDescriptor = (recordDescriptor);
+        this.frameTupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        this.innerFrameInfos = new ArrayList<>();
+    }
+
+    @Override
+    public int getFieldCount() {
+        return recordDescriptor.getFieldCount();
+    }
+
+    @Override
+    public int getFieldSlotsLength() {
+        return frameTupleAccessor.getFieldSlotsLength();
+    }
+
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        return frameTupleAccessor.getFieldEndOffset(resetSubTupleAccessor(tupleIndex), fIdx);
+    }
+
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        return frameTupleAccessor.getFieldStartOffset(resetSubTupleAccessor(tupleIndex), fIdx);
+    }
+
+    @Override
+    public int getFieldLength(int tupleIndex, int fIdx) {
+        return frameTupleAccessor.getFieldLength(resetSubTupleAccessor(tupleIndex), fIdx);
+    }
+
+    @Override
+    public int getTupleLength(int tupleIndex) {
+        return frameTupleAccessor.getTupleLength(resetSubTupleAccessor(tupleIndex));
+    }
+
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        return frameTupleAccessor.getTupleEndOffset(resetSubTupleAccessor(tupleIndex));
+    }
+
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        return frameTupleAccessor.getTupleStartOffset(resetSubTupleAccessor(tupleIndex));
+    }
+
+    @Override
+    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+        return frameTupleAccessor.getAbsoluteFieldStartOffset(resetSubTupleAccessor(tupleIndex), fIdx);
+    }
+
+    @Override
+    public int getTupleCount() {
+        return innerFrameInfos.size() > 0 ? innerFrameInfos.get(innerFrameInfos.size() - 1).tupleCount : 0;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    @Override
+    public void reset(ByteBuffer buffer) {
+        this.buffer = buffer;
+        this.lastTupleIndex = -1;
+        this.lastFrameId = -1;
+        parseGroupedBuffer(0, buffer.limit());
+    }
+
+    private void parseGroupedBuffer(int start, int stop) {
+        this.innerFrameInfos.clear();
+        int i = start;
+        while (i < stop) {
+            int unitSize = FrameHelper.deserializeNumOfMinFrame(buffer, i) * minFrameSize;
+            if (unitSize == 0) { // run consumed.
+                break;
+            }
+            if (i + unitSize > stop) { // contains future partial run, stop here
+                break;
+            }
+            frameTupleAccessor.reset(buffer, i, unitSize);
+            this.innerFrameInfos
+                    .add(new InnerFrameInfo(i, unitSize, getTupleCount() + frameTupleAccessor.getTupleCount()));
+            i += unitSize;
+        }
+        buffer.position(i); // reading stops here.
+    }
+
+    private int resetSubTupleAccessor(int tupleIndex) {
+        assert tupleIndex < getTupleCount();
+        if (innerFrameInfos.size() == 1) {
+            return tupleIndex;
+        }
+        if (tupleIndex == lastTupleIndex) {
+            return lastFrameId > 0 ? lastTupleIndex - innerFrameInfos.get(lastFrameId - 1).tupleCount : lastTupleIndex;
+        }
+        int subFrameId = Collections.binarySearch(innerFrameInfos, tupleIndex);
+        if (subFrameId >= 0) {
+            subFrameId++;
+        } else {
+            subFrameId = -subFrameId - 1;
+        }
+        frameTupleAccessor.reset(buffer, innerFrameInfos.get(subFrameId).start, innerFrameInfos.get(subFrameId).length);
+        lastTupleIndex = tupleIndex;
+        lastFrameId = subFrameId;
+        return lastFrameId > 0 ? lastTupleIndex - innerFrameInfos.get(lastFrameId - 1).tupleCount : lastTupleIndex;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupVSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupVSizeFrame.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupVSizeFrame.java
new file mode 100644
index 0000000..b273f5b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupVSizeFrame.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.util;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class GroupVSizeFrame extends VSizeFrame {
+
+    public GroupVSizeFrame(IHyracksCommonContext ctx, int frameSize)
+            throws HyracksDataException {
+        super(ctx, frameSize);
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        if (buffer.position() > 0 && buffer.hasRemaining()) {
+            movePartialFutureToStartPosition();
+        } else {
+            buffer.clear();
+        }
+    }
+
+    private void movePartialFutureToStartPosition() {
+        assert buffer.hasArray();
+        if (!FrameHelper.hasBeenCleared(buffer, buffer.position())) {
+            buffer.compact();
+            FrameHelper.clearRemainingFrame(buffer, buffer.position()); // mark it to make reset idempotent
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
new file mode 100644
index 0000000..01744f9
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.util;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Basically it a union of the {@link IFrameTupleAccessor} and {@link IFrameTupleAppender}.
+ * Moreover, it has the delete function as well.
+ * This is a special TupleAccessor used for TopK sorting.
+ * In HeapSort, or other Tuple-based operators, we need to append the tuple, access the arbitrary previously
+ * inserted tuple, and delete the previously inserted tuple.
+ */
+public interface IAppendDeletableFrameTupleAccessor extends IFrameTupleAccessor {
+
+    /**
+     * Prepare to write on this buffer
+     *
+     * @param buffer
+     * @throws HyracksDataException
+     */
+    void clear(ByteBuffer buffer) throws HyracksDataException;
+
+    /**
+     * Append tuple content to this buffer. Return the new tid as a handle to the caller.
+     *
+     * @param tupleAccessor
+     * @param tIndex
+     * @return
+     * @throws HyracksDataException
+     */
+    int append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException;
+
+    /**
+     * Remove the certain tuple by tid
+     *
+     * @param tid
+     */
+    void delete(int tid);
+
+    /**
+     * Reorganize the space to remove the unused space and make the free space contiguous.
+     */
+    void reOrganizeBuffer();
+
+    /**
+     * @return how many free space in total in the buffer, including the fragmented space
+     */
+    int getTotalFreeSpace();
+
+    /**
+     * @return how many contiguous free space in the buffer.
+     */
+    int getContiguousFreeSpace();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstractHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstractHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstractHeap.java
new file mode 100644
index 0000000..5d8b252
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstractHeap.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.dataflow.std.util.MathUtil;
+
+public abstract class AbstractHeap implements IHeap<IResetableComparable> {
+    protected static final int NOT_EXIST = -1;
+    protected static final int MAX_INITIAL_CAPACITY = 1024;
+    protected IResetableComparable[] entries;
+    protected IResetableComparable tempEntry;
+    protected IResetableComparableFactory factory;
+    protected int numEntry;
+
+    public AbstractHeap(IResetableComparableFactory factory, int capacity) {
+        capacity = Math.min(MAX_INITIAL_CAPACITY, Math.max(1, capacity));
+        this.entries = new IResetableComparable[capacity];
+        this.numEntry = 0;
+        this.tempEntry = factory.createResetableComparable();
+        this.factory = factory;
+    }
+
+    @Override
+    public void insert(IResetableComparable element) {
+        if (numEntry >= entries.length) {
+            entries = Arrays.copyOf(entries, entries.length * 2);
+        }
+        if (entries[numEntry] == null) {
+            entries[numEntry] = factory.createResetableComparable();
+        }
+        entries[numEntry++].reset(element);
+        bubbleUp(numEntry - 1);
+    }
+
+    protected abstract void bubbleUp(int i);
+
+    protected abstract void trickleDown(int i);
+
+    protected void swap(int cid, int pid) {
+        tempEntry.reset(entries[cid]);
+        entries[cid].reset(entries[pid]);
+        entries[pid].reset(tempEntry);
+    }
+
+    protected int compareTo(int i, int j) {
+        return entries[i].compareTo(entries[j]);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return numEntry == 0;
+    }
+
+    @Override
+    public void reset() {
+        for (int i = 0; i < numEntry; i++) {
+            entries[i] = null;
+        }
+        numEntry = 0;
+    }
+
+    /**
+     * By getting the entries it can manipulate the entries which may violate the Heap property.
+     * Use with care.
+     *
+     * @return
+     */
+    @Deprecated
+    public IResetableComparable[] getEntries() {
+        return entries;
+    }
+
+    @Override
+    public int getNumEntries() {
+        return numEntry;
+    }
+
+    protected int getLevel(int cid) {
+        return MathUtil.log2Floor(cid + 1);
+    }
+
+    static int getParentId(int cid) {
+        return cid < 1 ? NOT_EXIST : (cid - 1) / 2;
+    }
+
+    static int getLeftChild(int id, int numEntry) {
+        int cid = id * 2 + 1;
+        return cid >= numEntry ? NOT_EXIST : cid;
+    }
+
+    protected int getLeftChild(int id) {
+        return getLeftChild(id, numEntry);
+    }
+
+    static int getRightChild(int id, int numEntry) {
+        int cid = id * 2 + 2;
+        return cid >= numEntry ? NOT_EXIST : cid;
+    }
+
+    protected int getRightChild(int id) {
+        return getRightChild(id, numEntry);
+    }
+
+    protected int getGrandParentId(int id) {
+        int pid = getParentId(id);
+        return pid == NOT_EXIST ? NOT_EXIST : getParentId(pid);
+    }
+
+    protected boolean isDirectChild(int id, int childId) {
+        return id == getParentId(childId);
+    }
+
+    protected int getMinChild(int id) {
+        int min = NOT_EXIST;
+        if (id != NOT_EXIST) {
+            min = getLeftChild(id, numEntry);
+            if (min != NOT_EXIST) {
+                int rightCid = getRightChild(id, numEntry);
+                if (rightCid != NOT_EXIST) {
+                    min = compareTo(rightCid, min) < 0 ? rightCid : min;
+                }
+            }
+        }
+        return min;
+    }
+
+    protected int getMaxChild(int id) {
+        int max = NOT_EXIST;
+        if (id != NOT_EXIST) {
+            max = getLeftChild(id, numEntry);
+            if (max != NOT_EXIST) {
+                int rightCid = getRightChild(id, numEntry);
+                if (rightCid != NOT_EXIST) {
+                    max = compareTo(rightCid, max) > 0 ? rightCid : max;
+                }
+            }
+        }
+        return max;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IHeap.java
new file mode 100644
index 0000000..f650f12
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IHeap.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface IHeap<T> {
+    /**
+     * Inserts a new element into the selectionTree
+     *
+     * @param element to be inserted
+     */
+    void insert(T element);
+
+    /**
+     * @return True of the selection tree does not have any element, false
+     * otherwise
+     */
+    boolean isEmpty();
+
+    /**
+     * Removes all the elements in the tree
+     */
+    void reset();
+
+    /**
+     * Return the number of the inserted tuples
+     *
+     * @return
+     */
+    int getNumEntries();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMaxHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMaxHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMaxHeap.java
new file mode 100644
index 0000000..d932a0e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMaxHeap.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface IMaxHeap<T> extends IHeap<T> {
+    /**
+     * Removes and returns the largest element in the tree.
+     * Make sure the heap is not empty (by {@link #isEmpty()}) before calling this method
+     *
+     * @param result
+     */
+    void getMax(T result);
+
+    /**
+     * Returns (and does NOT remove) the largest element in the tree
+     *
+     * @param result is the object that will eventually contain maximum entry
+     *               pointer
+     */
+    void peekMax(T result);
+
+    /**
+     * Removes the current max and insert a new element.
+     * Normally it is a faster way to call getMax() && insert() together
+     *
+     * @param newElement
+     */
+    void replaceMax(T newElement);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinHeap.java
new file mode 100644
index 0000000..784c492
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinHeap.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface IMinHeap<T> extends IHeap<T> {
+    /**
+     * Removes and returns the smallest element in the tree.
+     * Make sure the heap is not empty (by {@link #isEmpty()}) before calling this method
+     *
+     * @param result
+     */
+    void getMin(T result);
+
+    /**
+     * Returns (and does NOT remove) the smallest element in the tree
+     *
+     * @param result is the object that will eventually contain minimum entry
+     *               pointer
+     */
+    void peekMin(T result);
+
+    /**
+     * Removes the current min and insert a new element.
+     * Normally it is a faster way to call getMin() && insert() together
+     *
+     * @param newElement
+     */
+    void replaceMin(T newElement);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinMaxHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinMaxHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinMaxHeap.java
new file mode 100644
index 0000000..8225d88
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinMaxHeap.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface IMinMaxHeap<T> extends IMinHeap<T>, IMaxHeap<T> {
+}
\ No newline at end of file