You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:23 UTC

[06/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
deleted file mode 100644
index d78af12..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-
-/**
- * @author pouria This class implements the run generator for sorting with
- *         replacement selection, where there is no limit on the output, i.e.
- *         the whole data should be sorted. A SortMinHeap is used as the
- *         selectionTree to decide the order of writing tuples into the runs,
- *         while memory manager is based on a binary search tree to allocate
- *         tuples in the memory. The overall process is as follows: - Read the
- *         input data frame by frame. For each tuple T in the current frame: -
- *         Try to allocate a memory slot for writing T along with the attached
- *         header/footer (for memory management purpose) - If T can not be
- *         allocated, try to output as many tuples, currently resident in
- *         memory, as needed so that a free slot, large enough to hold T, gets
- *         created. MinHeap decides about which tuple should be sent to the
- *         output at each step. - Write T into the memory - Calculate the runID
- *         of T (based on the last output tuple for the current run). It is
- *         either the current run or the next run. Also calculate Poorman's
- *         Normalized Key (PNK) for T, to make comparisons faster later. -
- *         Create a heap element for T, containing: its runID, the slot pointer
- *         to its memory location, and its PNK. - Insert the created heap
- *         element into the heap - Upon closing, write all the tuples, currently
- *         resident in memory, into their corresponding run(s). Again min heap
- *         decides about which tuple is the next for output.
- *         OptimizedSortOperatorDescriptor will merge the generated runs, to
- *         generate the final sorted output of the data.
- */
-public class OptimizedExternalSortRunGenerator implements IRunGenerator {
-    private final IHyracksTaskContext ctx;
-    private final int[] sortFields;
-    private final INormalizedKeyComputer nkc;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final IBinaryComparator[] comparators;
-    private final RecordDescriptor recordDescriptor;
-    private final List<IFrameReader> runs;
-
-    private ISelectionTree sTree;
-    private IMemoryManager memMgr;
-
-    private final int memSize;
-    private FrameTupleAccessor inputAccessor; // Used to read tuples in
-                                              // nextFrame()
-    private FrameTupleAppender outputAppender; // Used to write tuple to the
-                                               // dedicated output buffer
-    private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
-                                     // into run(s)
-    private FrameTupleAccessor lastRecordAccessor; // Used to read last output
-                                                   // record from the output
-                                                   // buffer
-    private int lastTupleIx; // Holds index of last output tuple in the
-                             // dedicated output buffer
-    private Slot allocationPtr; // Contains the ptr to the allocated memory slot
-                                // by the memory manager for the new tuple
-    private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
-                                // the selectionTree to output
-    private int[] sTreeTop;
-
-    private RunFileWriter writer;
-
-    private boolean newRun;
-    private int curRunId;
-
-    public OptimizedExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDesc, int memSize) {
-        this.ctx = ctx;
-        this.sortFields = sortFields;
-        nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
-        this.comparatorFactories = comparatorFactories;
-        comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        this.recordDescriptor = recordDesc;
-        this.runs = new LinkedList<IFrameReader>();
-        this.memSize = memSize;
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        runs.clear();
-        inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        outputAppender = new FrameTupleAppender(ctx.getFrameSize());
-        outputBuffer = ctx.allocateFrame();
-        outputAppender.reset(outputBuffer, true);
-        lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-
-        this.memMgr = new BSTMemMgr(ctx, memSize);
-        this.sTree = new SortMinHeap(ctx, sortFields, comparatorFactories, recordDescriptor, memMgr);
-        this.allocationPtr = new Slot();
-        this.outputedTuple = new Slot();
-        this.sTreeTop = new int[] { -1, -1, -1, -1 };
-        curRunId = -1;
-        openNewRun();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        inputAccessor.reset(buffer);
-        byte[] bufferArray = buffer.array();
-        int tupleCount = inputAccessor.getTupleCount();
-        for (int i = 0; i < tupleCount; ++i) {
-            allocationPtr.clear();
-            int tLength = inputAccessor.getTupleEndOffset(i) - inputAccessor.getTupleStartOffset(i);
-            memMgr.allocate(tLength, allocationPtr);
-            while (allocationPtr.isNull()) {
-                int unAllocSize = -1;
-                while (unAllocSize < tLength) {
-                    unAllocSize = outputRecord();
-                    if (unAllocSize < 1) {
-                        throw new HyracksDataException(
-                                "Unable to allocate space for the new tuple, while there is no more tuple to output");
-                    }
-                }
-                memMgr.allocate(tLength, allocationPtr);
-            }
-            memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
-            int runId = getRunId(inputAccessor, i);
-            int pnk = getPNK(inputAccessor, i, bufferArray);
-            int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
-            sTree.insert(entry);
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        while (!sTree.isEmpty()) { // Outputting remaining elements in the
-                                   // selectionTree
-            outputRecord();
-        }
-        if (outputAppender.getTupleCount() > 0) { // Writing out very last
-                                                  // resident records to file
-            FrameUtils.flushFrame(outputBuffer, writer);
-        }
-        outputAppender.reset(outputBuffer, true);
-        writer.close();
-        runs.add(writer.createReader());
-        memMgr.close();
-    }
-
-    public List<IFrameReader> getRuns() {
-        return runs;
-    }
-
-    private int outputRecord() throws HyracksDataException {
-        outputedTuple.clear();
-        sTree.getMin(sTreeTop);
-        if (!isEntryValid(sTreeTop)) {
-            throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
-        }
-
-        if (sTreeTop[SortMinHeap.RUN_ID_IX] != curRunId) { // We need to switch
-                                                           // runs
-            openNewRun();
-        }
-
-        int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
-        int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
-        if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
-                                                                    // append to
-                                                                    // the
-                                                                    // tupleAppender
-            FrameUtils.flushFrame(outputBuffer, writer);
-            outputAppender.reset(outputBuffer, true);
-            if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
-                throw new HyracksDataException("Can not append to the ouput buffer in sort");
-            }
-            lastTupleIx = 0;
-        } else {
-            lastTupleIx++;
-        }
-        outputedTuple.set(tFrameIx, tOffset);
-        newRun = false;
-        return memMgr.unallocate(outputedTuple);
-
-    }
-
-    private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) {
-        // Moved buffInArray out for better performance (not converting for each and every tuple)
-        int sfIdx = sortFields[0];
-        int tStart = fta.getTupleStartOffset(tIx);
-        int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
-        int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
-        int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
-        return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel - f0StartRel));
-    }
-
-    private int getRunId(FrameTupleAccessor fta, int tupIx) throws HyracksDataException {
-        // Comparing current record to last output record, it decides about current record's runId
-        if (newRun) { // Very first record for a new run
-            return curRunId;
-        }
-
-        byte[] lastRecBuff = outputBuffer.array();
-        lastRecordAccessor.reset(outputBuffer);
-        int lastStartOffset = lastRecordAccessor.getTupleStartOffset(lastTupleIx);
-
-        ByteBuffer fr2 = fta.getBuffer();
-        byte[] curRecBuff = fr2.array();
-        int r2StartOffset = fta.getTupleStartOffset(tupIx);
-
-        for (int f = 0; f < comparators.length; ++f) {
-            int fIdx = sortFields[f];
-            int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset + (fIdx - 1) * 4);
-            int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
-            int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength() + f1Start;
-            int l1 = f1End - f1Start;
-            int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
-            int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
-            int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
-            int l2 = f2End - f2Start;
-            int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2, l2);
-            if (c != 0) {
-                if (c <= 0) {
-                    return curRunId;
-                } else {
-                    return (curRunId + 1);
-                }
-            }
-        }
-        return curRunId;
-    }
-
-    private void openNewRun() throws HyracksDataException {
-        if (writer != null) { // There is a prev run, so flush its tuples and
-                              // close it first
-            if (outputAppender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(outputBuffer, writer);
-            }
-            outputAppender.reset(outputBuffer, true);
-            writer.close();
-            runs.add(writer.createReader());
-        }
-
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                ExternalSortRunGenerator.class.getSimpleName());
-        writer = new RunFileWriter(file, ctx.getIOManager());
-        writer.open();
-        curRunId++;
-        newRun = true;
-        lastTupleIx = -1;
-    }
-
-    private boolean isEntryValid(int[] entry) {
-        return ((entry[SortMinHeap.RUN_ID_IX] > -1) && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
deleted file mode 100644
index 5b01fb8..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-
-/**
- * @author pouria
- *         This class implements the run generator for sorting with replacement
- *         selection, where there is a limit on the output, i.e. we are looking
- *         for top-k tuples (first k smallest tuples w.r.t sorting keys).
- *         A SortMinMaxHeap is used as the selectionTree to decide the order of
- *         writing tuples into the runs, and also to prune tuples (if possible).
- *         Memory manager is based on a binary search tree and is used to
- *         allocate memory slots for tuples.
- *         The overall process is as follows (Assuming that the limit is K):
- *         - Read the input data frame by frame. For each tuple T in the current
- *         frame:
- *         - If currentRun R has reached the limit of K on the size, and (T >
- *         maximum tuple of R), then ignore T.
- *         - Otherwise, try to allocate a memory slot for writing T along with
- *         the attached header/footer (for memory management purpose)
- *         - If T can not be allocated, try to output as many tuples, currently
- *         resident in memory, as needed so that a free slot, large enough to
- *         hold T, gets created. MinMaxHeap decides about which tuple should be
- *         sent to the output at each step.
- *         - Write T into memory.
- *         - Calculate the runID of T (based on the last output tuple for the
- *         current run). It is either the current run or the next run. Also
- *         calculate Poorman's Normalized Key (PNK) for T, to make comparisons
- *         faster later.
- *         - Create an heap element for T, containing its runID, the slot ptr to
- *         its memory location, and its PNK.
- *         - If runID is the nextRun, insert the heap element into the heap, and
- *         increment the size of nextRun.
- *         - If runID is the currentRun, then:
- *         - If currentRun has not hit the limit of k, insert the element into
- *         the heap, and increase currentRun size. - Otherwise, currentRun has
- *         hit the limit of K, while T is less than the max. So discard the
- *         current max for the current run (by poping it from the heap and
- *         unallocating its memory location) and insert the heap element into
- *         the heap. No need to change the currentRun size as we are replacing
- *         an old element (the old max) with T.
- *         - Upon closing, write all the tuples, currently resident in memory,
- *         into their corresponding run(s).
- *         - Note that upon opening a new Run R, if size of R (based on stats)
- *         is S and (S > K), then (S-K) current maximum tuples of R (which are
- *         resident in memory) get discarded at the beginning. MinMax heap can
- *         be used to find these tuples.
- */
-public class OptimizedExternalSortRunGeneratorWithLimit implements IRunGenerator {
-
-    private final IHyracksTaskContext ctx;
-    private final int[] sortFields;
-    private final INormalizedKeyComputer nkc;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final IBinaryComparator[] comparators;
-    private final RecordDescriptor recordDescriptor;
-    private final List<IFrameReader> runs;
-
-    private ISelectionTree sTree;
-    private IMemoryManager memMgr;
-
-    private final int memSize;
-    private FrameTupleAccessor inputAccessor; // Used to read tuples in
-                                              // nextFrame()
-    private FrameTupleAppender outputAppender; // Used to write tuple to the
-                                               // dedicated output buffer
-    private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
-                                     // into run(s)
-    private FrameTupleAccessor lastRecordAccessor; // Used to read last output
-                                                   // record from the output
-                                                   // buffer
-    private FrameTupleAccessor fta2; // Used to read max record
-    private final int outputLimit;
-    private int curRunSize;
-    private int nextRunSize;
-    private int lastTupleIx; // Holds index of last output tuple in the
-                             // dedicated output buffer
-    private Slot allocationPtr; // Contains the ptr to the allocated memory slot
-                                // by the memory manager for the new tuple
-    private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
-                                // the selectionTree to output
-    private Slot discard;
-    private int[] sTreeTop;
-    private int[] peek;
-    private RunFileWriter writer;
-    private boolean newRun;
-    private int curRunId;
-
-    public OptimizedExternalSortRunGeneratorWithLimit(IHyracksTaskContext ctx, int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDesc, int memSize, int limit) {
-
-        this.ctx = ctx;
-        this.sortFields = sortFields;
-        nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
-        this.comparatorFactories = comparatorFactories;
-        comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        this.recordDescriptor = recordDesc;
-        this.runs = new LinkedList<IFrameReader>();
-        this.memSize = memSize;
-
-        this.outputLimit = limit;
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        runs.clear();
-        inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        outputAppender = new FrameTupleAppender(ctx.getFrameSize());
-        outputBuffer = ctx.allocateFrame();
-        outputAppender.reset(outputBuffer, true);
-        lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        this.memMgr = new BSTMemMgr(ctx, memSize);
-        this.sTree = new SortMinMaxHeap(ctx, sortFields, comparatorFactories, recordDescriptor, memMgr);
-        this.allocationPtr = new Slot();
-        this.outputedTuple = new Slot();
-        this.sTreeTop = new int[] { -1, -1, -1, -1 };
-        this.peek = new int[] { -1, -1, -1, -1 };
-        this.discard = new Slot();
-
-        curRunId = -1;
-        curRunSize = 0;
-        nextRunSize = 0;
-        openNewRun();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        inputAccessor.reset(buffer);
-        byte[] bufferArray = buffer.array();
-        int tupleCount = inputAccessor.getTupleCount();
-        for (int i = 0; i < tupleCount; i++) {
-            if (curRunSize >= outputLimit) {
-                sTree.peekMax(peek);
-                if (isEntryValid(peek)
-                        && compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX],
-                                peek[SortMinMaxHeap.OFFSET_IX]) >= 0) {
-                    continue;
-                }
-            }
-
-            allocationPtr.clear();
-            int tLength = inputAccessor.getTupleEndOffset(i) - inputAccessor.getTupleStartOffset(i);
-            memMgr.allocate(tLength, allocationPtr);
-            while (allocationPtr.isNull()) {
-                int unAllocSize = -1;
-                while (unAllocSize < tLength) {
-                    unAllocSize = outputRecord();
-                    if (unAllocSize < 1) {
-                        throw new HyracksDataException(
-                                "Unable to allocate space for the new tuple, while there is no more tuple to output");
-                    }
-                }
-                memMgr.allocate(tLength, allocationPtr);
-            }
-
-            int pnk = getPNK(inputAccessor, i, bufferArray);
-            int runId = getRunId(inputAccessor, i);
-            if (runId != curRunId) { // tuple belongs to the next run
-                memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
-                int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
-                sTree.insert(entry);
-                nextRunSize++;
-                continue;
-            }
-            // belongs to the current run
-            if (curRunSize < outputLimit) {
-                memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
-                int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
-                sTree.insert(entry);
-                curRunSize++;
-                continue;
-            }
-
-            sTree.peekMax(peek);
-            if (compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]) > 0) {
-                continue;
-            }
-            // replacing the max
-            sTree.getMax(peek);
-            discard.set(peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]);
-            memMgr.unallocate(discard);
-            memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
-            int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
-            sTree.insert(entry);
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        while (!sTree.isEmpty()) { // Outputting remaining elements in the
-                                   // selectionTree
-            outputRecordForClose();
-        }
-
-        if (outputAppender.getTupleCount() > 0) { // Writing out very last
-                                                  // resident records to file
-            FrameUtils.flushFrame(outputBuffer, writer);
-        }
-
-        writer.close();
-        runs.add(writer.createReader());
-        memMgr.close();
-    }
-
-    public List<IFrameReader> getRuns() {
-        return runs;
-    }
-
-    private int outputRecord() throws HyracksDataException {
-        outputedTuple.clear();
-        sTree.getMin(sTreeTop);
-        if (!isEntryValid(sTreeTop)) {
-            throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
-        }
-        int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
-        int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
-        if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] == curRunId) {
-            if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can
-                                                                        // not
-                                                                        // append
-                                                                        // to
-                                                                        // the
-                                                                        // tupleAppender
-                FrameUtils.flushFrame(outputBuffer, writer);
-                outputAppender.reset(outputBuffer, true);
-                if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
-                    throw new HyracksDataException("Can not append to the ouput buffer in sort");
-                }
-                lastTupleIx = 0;
-            } else {
-                lastTupleIx++;
-            }
-            outputedTuple.set(tFrameIx, tOffset);
-            newRun = false;
-            return memMgr.unallocate(outputedTuple);
-        }
-        // Minimum belongs to the next Run
-        openNewRun();
-        int popCount = curRunSize - outputLimit;
-        int l = 0;
-        int maxFreedSpace = 0;
-        for (int p = 0; p < popCount; p++) {
-            sTree.getMax(peek);
-            if (!isEntryValid(peek)) {
-                throw new HyracksDataException("Invalid Maximum extracted from MinMaxHeap");
-            }
-            discard.set(peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]);
-            l = memMgr.unallocate(discard);
-            if (l > maxFreedSpace) {
-                maxFreedSpace = l;
-            }
-            curRunSize--;
-        }
-
-        if (maxFreedSpace != 0) {
-            return maxFreedSpace;
-        }
-        // No max discarded (We just flushed out the prev run, so the output
-        // buffer should be clear)
-        if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
-                                                                    // append to
-                                                                    // the
-                                                                    // tupleAppender
-            throw new HyracksDataException("Can not append to the ouput buffer in sort");
-        }
-        lastTupleIx = 0;
-        outputedTuple.set(tFrameIx, tOffset);
-        newRun = false;
-        return memMgr.unallocate(outputedTuple);
-    }
-
-    private void outputRecordForClose() throws HyracksDataException {
-        sTree.getMin(sTreeTop);
-        if (!isEntryValid(sTreeTop)) {
-            throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
-        }
-        int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
-        int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
-        if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] != curRunId) {
-            openNewRun();
-        }
-
-        if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
-                                                                    // append to
-                                                                    // the
-                                                                    // tupleAppender
-            FrameUtils.flushFrame(outputBuffer, writer);
-            outputAppender.reset(outputBuffer, true);
-            if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
-                throw new HyracksDataException("Can not append to the ouput buffer in sort");
-            }
-        }
-    }
-
-    private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) {
-        // Moved buffInArray out for better performance (not converting for each and every tuple)
-        int sfIdx = sortFields[0];
-        int tStart = fta.getTupleStartOffset(tIx);
-        int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
-        int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
-        int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
-        return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel - f0StartRel));
-    }
-
-    private int getRunId(FrameTupleAccessor fta, int tupIx) throws HyracksDataException {
-        // Comparing current record to last output record, it decides about current record's runId
-        if (newRun) { // Very first record for a new run
-            return curRunId;
-        }
-
-        byte[] lastRecBuff = outputBuffer.array();
-        lastRecordAccessor.reset(outputBuffer);
-        int lastStartOffset = lastRecordAccessor.getTupleStartOffset(lastTupleIx);
-
-        ByteBuffer fr2 = fta.getBuffer();
-        byte[] curRecBuff = fr2.array();
-        int r2StartOffset = fta.getTupleStartOffset(tupIx);
-
-        for (int f = 0; f < comparators.length; ++f) {
-            int fIdx = sortFields[f];
-            int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset + (fIdx - 1) * 4);
-            int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
-            int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength() + f1Start;
-            int l1 = f1End - f1Start;
-            int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
-            int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
-            int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
-            int l2 = f2End - f2Start;
-            int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2, l2);
-            if (c != 0) {
-                if (c <= 0) {
-                    return curRunId;
-                } else {
-                    return (curRunId + 1);
-                }
-            }
-        }
-        return curRunId;
-    }
-
-    // first<sec : -1
-    private int compareRecords(FrameTupleAccessor fta1, int ix1, int fix2, int offset2) throws HyracksDataException {
-        ByteBuffer buff1 = fta1.getBuffer();
-        byte[] recBuff1 = buff1.array();
-        int offset1 = fta1.getTupleStartOffset(ix1);
-
-        offset2 += BSTNodeUtil.HEADER_SIZE;
-        ByteBuffer buff2 = memMgr.getFrame(fix2);
-        fta2.reset(buff2);
-        byte[] recBuff2 = buff2.array();
-
-        for (int f = 0; f < comparators.length; ++f) {
-            int fIdx = sortFields[f];
-            int f1Start = fIdx == 0 ? 0 : buff1.getInt(offset1 + (fIdx - 1) * 4);
-            int f1End = buff1.getInt(offset1 + fIdx * 4);
-            int s1 = offset1 + fta1.getFieldSlotsLength() + f1Start;
-            int l1 = f1End - f1Start;
-            int f2Start = fIdx == 0 ? 0 : buff2.getInt(offset2 + (fIdx - 1) * 4);
-            int f2End = buff2.getInt(offset2 + fIdx * 4);
-            int s2 = offset2 + fta2.getFieldSlotsLength() + f2Start;
-            int l2 = f2End - f2Start;
-            int c = comparators[f].compare(recBuff1, s1, l1, recBuff2, s2, l2);
-
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-
-    }
-
-    private void openNewRun() throws HyracksDataException {
-        if (writer != null) { // There is a prev run, so flush its tuples and
-                              // close it first
-            if (outputAppender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(outputBuffer, writer);
-            }
-            outputAppender.reset(outputBuffer, true);
-            writer.close();
-            runs.add(writer.createReader());
-        }
-
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                ExternalSortRunGenerator.class.getSimpleName());
-        writer = new RunFileWriter(file, ctx.getIOManager());
-        writer.open();
-        curRunId++;
-        newRun = true;
-        curRunSize = nextRunSize;
-        nextRunSize = 0;
-        lastTupleIx = -1;
-    }
-
-    private boolean isEntryValid(int[] entry) {
-        return ((entry[SortMinHeap.RUN_ID_IX] > -1) && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunAndMaxFrameSizePair.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunAndMaxFrameSizePair.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunAndMaxFrameSizePair.java
new file mode 100644
index 0000000..c68f1e7
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunAndMaxFrameSizePair.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+
+public class RunAndMaxFrameSizePair {
+    public IFrameReader run;
+    public int maxFrameSize;
+
+    public RunAndMaxFrameSizePair(IFrameReader run, int maxFrameSize) {
+        this.run = run;
+        this.maxFrameSize = maxFrameSize;
+    }
+
+    void updateSize(int newMaxSize){
+        this.maxFrameSize = newMaxSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index cf0d0ad..f013594 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -1,23 +1,23 @@
 /*
  * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
-import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -25,52 +25,61 @@ import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.GroupFrameAccessor;
 import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
 import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
 
 public class RunMergingFrameReader implements IFrameReader {
     private final IHyracksTaskContext ctx;
-    private final IFrameReader[] runCursors;
-    private final List<ByteBuffer> inFrames;
+    private final List<? extends IFrameReader> runCursors;
+    private final List<? extends IFrame> inFrames;
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
     private final INormalizedKeyComputer nmkComputer;
     private final RecordDescriptor recordDesc;
-    private final FrameTupleAppender outFrameAppender;
+    private final int topK;
+    private int tupleCount;
+    private FrameTupleAppender outFrameAppender;
     private ReferencedPriorityQueue topTuples;
     private int[] tupleIndexes;
-    private FrameTupleAccessor[] tupleAccessors;
+    private IFrameTupleAccessor[] tupleAccessors;
 
-    public RunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, List<ByteBuffer> inFrames,
-            int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
-            RecordDescriptor recordDesc) {
+    public RunMergingFrameReader(IHyracksTaskContext ctx, List<? extends IFrameReader> runs,
+            List<? extends IFrame> inFrames, int[] sortFields, IBinaryComparator[] comparators,
+            INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc) {
+        this(ctx, runs, inFrames, sortFields, comparators, nmkComputer, recordDesc, Integer.MAX_VALUE);
+    }
+
+    public RunMergingFrameReader(IHyracksTaskContext ctx, List<? extends IFrameReader> runs,
+            List<? extends IFrame> inFrames, int[] sortFields, IBinaryComparator[] comparators,
+            INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc, int topK) {
         this.ctx = ctx;
-        this.runCursors = runCursors;
+        this.runCursors = runs;
         this.inFrames = inFrames;
         this.sortFields = sortFields;
         this.comparators = comparators;
         this.nmkComputer = nmkComputer;
         this.recordDesc = recordDesc;
-        outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+        this.topK = topK;
     }
 
     @Override
     public void open() throws HyracksDataException {
-        tupleAccessors = new FrameTupleAccessor[runCursors.length];
+        tupleCount = 0;
+        tupleAccessors = new IFrameTupleAccessor[runCursors.size()];
+        outFrameAppender = new FrameTupleAppender();
         Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-        topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator,
-                sortFields, nmkComputer);
-        tupleIndexes = new int[runCursors.length];
-        for (int i = 0; i < runCursors.length; i++) {
+        topTuples = new ReferencedPriorityQueue(runCursors.size(), comparator, sortFields, nmkComputer);
+        tupleIndexes = new int[runCursors.size()];
+        for (int i = 0; i < runCursors.size(); i++) {
             tupleIndexes[i] = 0;
             int runIndex = topTuples.peek().getRunid();
-            runCursors[runIndex].open();
-            if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
-                tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
-                tupleAccessors[runIndex].reset(inFrames.get(runIndex));
-                setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+            runCursors.get(runIndex).open();
+            if (runCursors.get(runIndex).nextFrame(inFrames.get(runIndex))) {
+                tupleAccessors[runIndex] = new GroupFrameAccessor(ctx.getInitialFrameSize(), recordDesc);
+                tupleAccessors[runIndex].reset(inFrames.get(runIndex).getBuffer());
+                setNextTopTuple(runIndex, tupleIndexes, runCursors, inFrames, tupleAccessors, topTuples);
             } else {
                 closeRun(runIndex, runCursors, tupleAccessors);
                 topTuples.pop();
@@ -79,20 +88,21 @@ public class RunMergingFrameReader implements IFrameReader {
     }
 
     @Override
-    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        outFrameAppender.reset(buffer, true);
-        while (!topTuples.areRunsExhausted()) {
+    public boolean nextFrame(IFrame outFrame) throws HyracksDataException {
+        outFrameAppender.reset(outFrame, true);
+        while (!topTuples.areRunsExhausted() && tupleCount < topK) {
             ReferenceEntry top = topTuples.peek();
             int runIndex = top.getRunid();
-            FrameTupleAccessor fta = top.getAccessor();
+            IFrameTupleAccessor fta = top.getAccessor();
             int tupleIndex = top.getTupleIndex();
 
             if (!outFrameAppender.append(fta, tupleIndex)) {
                 return true;
+            } else {
+                tupleCount++;
             }
-
             ++tupleIndexes[runIndex];
-            setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+            setNextTopTuple(runIndex, tupleIndexes, runCursors, inFrames, tupleAccessors, topTuples);
         }
 
         if (outFrameAppender.getTupleCount() > 0) {
@@ -103,14 +113,15 @@ public class RunMergingFrameReader implements IFrameReader {
 
     @Override
     public void close() throws HyracksDataException {
-        for (int i = 0; i < runCursors.length; ++i) {
+        for (int i = 0; i < runCursors.size(); ++i) {
             closeRun(i, runCursors, tupleAccessors);
         }
     }
 
-    private void setNextTopTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
-            FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
-        boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+    private static void setNextTopTuple(int runIndex, int[] tupleIndexes, List<? extends IFrameReader> runCursors,
+            List<? extends IFrame> inFrames, IFrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
+            throws HyracksDataException {
+        boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, inFrames, tupleAccessors);
         if (exists) {
             topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
         } else {
@@ -119,15 +130,16 @@ public class RunMergingFrameReader implements IFrameReader {
         }
     }
 
-    private boolean hasNextTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
-            FrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
-        if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
+    private static boolean hasNextTuple(int runIndex, int[] tupleIndexes, List<? extends IFrameReader> runCursors,
+            List<? extends IFrame> inFrames, IFrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
+        if (tupleAccessors[runIndex] == null || runCursors.get(runIndex) == null) {
             return false;
         } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
-            ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
-            if (runCursors[runIndex].nextFrame(buf)) {
+            IFrame frame = inFrames.get(runIndex);
+            if (runCursors.get(runIndex).nextFrame(frame)) {
                 tupleIndexes[runIndex] = 0;
-                return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+                tupleAccessors[runIndex].reset(frame.getBuffer());
+                return hasNextTuple(runIndex, tupleIndexes, runCursors, inFrames, tupleAccessors);
             } else {
                 return false;
             }
@@ -136,11 +148,12 @@ public class RunMergingFrameReader implements IFrameReader {
         }
     }
 
-    private void closeRun(int index, IFrameReader[] runCursors, IFrameTupleAccessor[] tupleAccessors)
+    private static void closeRun(int index, List<? extends IFrameReader> runCursors,
+            IFrameTupleAccessor[] tupleAccessors)
             throws HyracksDataException {
-        if (runCursors[index] != null) {
-            runCursors[index].close();
-            runCursors[index] = null;
+        if (runCursors.get(index) != null) {
+            runCursors.get(index).close();
+            runCursors.set(index, null);
             tupleAccessors[index] = null;
         }
     }
@@ -153,8 +166,8 @@ public class RunMergingFrameReader implements IFrameReader {
                 if (nmk1 != nmk2) {
                     return ((((long) nmk1) & 0xffffffffL) < (((long) nmk2) & 0xffffffffL)) ? -1 : 1;
                 }
-                FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
-                FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
+                IFrameTupleAccessor fta1 = tp1.getAccessor();
+                IFrameTupleAccessor fta2 = tp2.getAccessor();
                 byte[] b1 = fta1.getBuffer().array();
                 byte[] b2 = fta2.getBuffer().array();
                 int[] tPointers1 = tp1.getTPointers();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
deleted file mode 100644
index 73f99dd..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-/**
- * @author pouria
- *         Defines a slot in the memory, which can be a free or used (allocated)
- *         slot. Memory is a set of frames, ordered as a list. Each tuple is
- *         stored in a slot, where the location of the slot is denoted by a pair
- *         of integers:
- *         - The index of the frame, in the list of frames in memory. (referred
- *         to as frameIx)
- *         - The starting offset of the slot, within that specific frame.
- *         (referred to as offset)
- */
-public class Slot {
-
-    private int frameIx;
-    private int offset;
-
-    public Slot() {
-        this.frameIx = BSTNodeUtil.INVALID_INDEX;
-        this.offset = BSTNodeUtil.INVALID_INDEX;
-    }
-
-    public Slot(int frameIx, int offset) {
-        this.frameIx = frameIx;
-        this.offset = offset;
-    }
-
-    public void set(int frameIx, int offset) {
-        this.frameIx = frameIx;
-        this.offset = offset;
-    }
-
-    public int getFrameIx() {
-        return frameIx;
-    }
-
-    public void setFrameIx(int frameIx) {
-        this.frameIx = frameIx;
-    }
-
-    public int getOffset() {
-        return offset;
-    }
-
-    public void setOffset(int offset) {
-        this.offset = offset;
-    }
-
-    public boolean isNull() {
-        return (frameIx == BSTNodeUtil.INVALID_INDEX) || (offset == BSTNodeUtil.INVALID_INDEX);
-    }
-
-    public void clear() {
-        this.frameIx = BSTNodeUtil.INVALID_INDEX;
-        this.offset = BSTNodeUtil.INVALID_INDEX;
-    }
-
-    public void copy(Slot s) {
-        this.frameIx = s.getFrameIx();
-        this.offset = s.getOffset();
-    }
-
-    public String toString() {
-        return "(" + frameIx + ", " + offset + ")";
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
deleted file mode 100644
index 1cde75f..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * @author pouria
- *         Implements a minimum binary heap, used as selection tree, for sort
- *         with replacement. This heap structure can only be used as the min
- *         heap (no access to the max element). Elements in the heap are
- *         compared based on their run numbers, and sorting key(s):
- *         Considering two heap elements A and B:
- *         if RunNumber(A) > RunNumber(B) then A is larger than B if
- *         RunNumber(A) == RunNumber(B), then A is smaller than B, if and only
- *         if the value of the sort key(s) in B is greater than A (based on the
- *         sort comparator).
- */
-public class SortMinHeap implements ISelectionTree {
-
-    static final int RUN_ID_IX = 0;
-    static final int FRAME_IX = 1;
-    static final int OFFSET_IX = 2;
-    private static final int PNK_IX = 3;
-    private static final int ELEMENT_SIZE = 4;
-    private static final int INIT_ARRAY_SIZE = 512;
-
-    private final int[] sortFields;
-    private final IBinaryComparator[] comparators;
-    private final RecordDescriptor recordDescriptor;
-    private final FrameTupleAccessor fta1;
-    private final FrameTupleAccessor fta2;
-    private int[] elements;
-    private int nextIx;
-    private final IMemoryManager memMgr;
-    private int[] top; // Used as a temp variable to access the top, to avoid object creation
-
-    public SortMinHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDesc, IMemoryManager memMgr) {
-        this.sortFields = sortFields;
-        this.comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            this.comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        this.recordDescriptor = recordDesc;
-        fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        this.memMgr = memMgr;
-        this.top = new int[ELEMENT_SIZE];
-        Arrays.fill(top, -1);
-        this.elements = new int[INIT_ARRAY_SIZE];
-        Arrays.fill(elements, -1);
-        this.nextIx = 0;
-    }
-
-    /*
-     * Assumption (element structure): [RunId][FrameIx][Offset][Poorman NK]
-     */
-    @Override
-    public void getMin(int[] result) throws HyracksDataException {
-        if (nextIx == 0) {
-            result[0] = result[1] = result[2] = result[3] = -1;
-            return;
-        }
-
-        top = delete(0);
-        for (int i = 0; i < top.length; i++) {
-            result[i] = top[i];
-        }
-    }
-
-    @Override
-    public void peekMin(int[] result) {
-        if (nextIx == 0) {
-            result[0] = result[1] = result[2] = result[3] = -1;
-            return;
-        }
-        for (int i = 0; i < ELEMENT_SIZE; i++) {
-            result[i] = elements[i];
-        }
-    }
-
-    @Override
-    public void insert(int[] e) throws HyracksDataException {
-        if (nextIx >= elements.length) {
-            elements = Arrays.copyOf(elements, elements.length * 2);
-        }
-        for (int i = 0; i < ELEMENT_SIZE; i++) {
-            elements[nextIx + i] = e[i];
-        }
-        siftUp(nextIx);
-        nextIx += ELEMENT_SIZE;
-
-    }
-
-    @Override
-    public void reset() {
-        Arrays.fill(elements, -1);
-        nextIx = 0;
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return (nextIx < ELEMENT_SIZE);
-    }
-
-    public int _debugGetSize() {
-        return (nextIx > 0 ? (nextIx - 1) / 4 : 0);
-    }
-
-    private int[] delete(int nix) throws HyracksDataException {
-        int[] nv = Arrays.copyOfRange(elements, nix, nix + ELEMENT_SIZE);
-        int[] lastElem = removeLast();
-
-        if (nextIx == 0) {
-            return nv;
-        }
-
-        for (int i = 0; i < ELEMENT_SIZE; i++) {
-            elements[nix + i] = lastElem[i];
-        }
-        int pIx = getParent(nix);
-        if (pIx > -1 && (compare(lastElem, Arrays.copyOfRange(elements, pIx, pIx + ELEMENT_SIZE)) < 0)) {
-            siftUp(nix);
-        } else {
-            siftDown(nix);
-        }
-        return nv;
-    }
-
-    private int[] removeLast() {
-        if (nextIx < ELEMENT_SIZE) { //this is the very last element
-            return new int[] { -1, -1, -1, -1 };
-        }
-        int[] l = Arrays.copyOfRange(elements, nextIx - ELEMENT_SIZE, nextIx);
-        Arrays.fill(elements, nextIx - ELEMENT_SIZE, nextIx, -1);
-        nextIx -= ELEMENT_SIZE;
-        return l;
-    }
-
-    private void siftUp(int nodeIx) throws HyracksDataException {
-        int p = getParent(nodeIx);
-        if (p < 0) {
-            return;
-        }
-        while (p > -1 && (compare(nodeIx, p) < 0)) {
-            swap(p, nodeIx);
-            nodeIx = p;
-            p = getParent(nodeIx);
-            if (p < 0) { // We are at the root
-                return;
-            }
-        }
-    }
-
-    private void siftDown(int nodeIx) throws HyracksDataException {
-        int mix = getMinOfChildren(nodeIx);
-        if (mix < 0) {
-            return;
-        }
-        while (mix > -1 && (compare(mix, nodeIx) < 0)) {
-            swap(mix, nodeIx);
-            nodeIx = mix;
-            mix = getMinOfChildren(nodeIx);
-            if (mix < 0) { // We hit the leaf level
-                return;
-            }
-        }
-    }
-
-    // first < sec : -1
-    private int compare(int nodeSIx1, int nodeSIx2) throws HyracksDataException {
-        int[] n1 = Arrays.copyOfRange(elements, nodeSIx1, nodeSIx1 + ELEMENT_SIZE);
-        int[] n2 = Arrays.copyOfRange(elements, nodeSIx2, nodeSIx2 + ELEMENT_SIZE);
-        return (compare(n1, n2));
-    }
-
-    // first < sec : -1
-    private int compare(int[] n1, int[] n2) throws HyracksDataException {
-        // Compare Run Numbers
-        if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
-            return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
-        }
-
-        // Compare Poor man Normalized Keys
-        if (n1[PNK_IX] != n2[PNK_IX]) {
-            return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1 : 1;
-        }
-
-        return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]), n1[OFFSET_IX], n2[OFFSET_IX]);
-    }
-
-    private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset, int r2StartOffset)
-            throws HyracksDataException {
-        byte[] b1 = fr1.array();
-        byte[] b2 = fr2.array();
-        fta1.reset(fr1);
-        fta2.reset(fr2);
-        int headerLen = BSTNodeUtil.HEADER_SIZE;
-        r1StartOffset += headerLen;
-        r2StartOffset += headerLen;
-        for (int f = 0; f < comparators.length; ++f) {
-            int fIdx = sortFields[f];
-            int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1) * 4);
-            int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
-            int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
-            int l1 = f1End - f1Start;
-            int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
-            int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
-            int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
-            int l2 = f2End - f2Start;
-
-            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-    }
-
-    private int getMinOfChildren(int nix) throws HyracksDataException { // returns index of min child
-        int lix = getLeftChild(nix);
-        if (lix < 0) {
-            return -1;
-        }
-        int rix = getRightChild(nix);
-        if (rix < 0) {
-            return lix;
-        }
-        return ((compare(lix, rix) < 0) ? lix : rix);
-    }
-
-    //Assumption: n1Ix and n2Ix are starting indices of two elements
-    private void swap(int n1Ix, int n2Ix) {
-        int[] temp = Arrays.copyOfRange(elements, n1Ix, n1Ix + ELEMENT_SIZE);
-        for (int i = 0; i < ELEMENT_SIZE; i++) {
-            elements[n1Ix + i] = elements[n2Ix + i];
-            elements[n2Ix + i] = temp[i];
-        }
-    }
-
-    private int getLeftChild(int ix) {
-        int lix = (2 * ELEMENT_SIZE) * (ix / ELEMENT_SIZE) + ELEMENT_SIZE;
-        return ((lix < nextIx) ? lix : -1);
-    }
-
-    private int getRightChild(int ix) {
-        int rix = (2 * ELEMENT_SIZE) * (ix / ELEMENT_SIZE) + (2 * ELEMENT_SIZE);
-        return ((rix < nextIx) ? rix : -1);
-    }
-
-    private int getParent(int ix) {
-        if (ix <= 0) {
-            return -1;
-        }
-        return ((ix - ELEMENT_SIZE) / (2 * ELEMENT_SIZE)) * ELEMENT_SIZE;
-    }
-
-    private ByteBuffer getFrame(int frameIx) {
-        return (memMgr.getFrame(frameIx));
-    }
-
-    @Override
-    public void getMax(int[] result) {
-        throw new IllegalStateException("getMax() method not applicable to Min Heap");
-    }
-
-    @Override
-    public void peekMax(int[] result) {
-        throw new IllegalStateException("getMax() method not applicable to Min Heap");
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
deleted file mode 100644
index 12aa8a1..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * @author pouria
- *         Implements a MinMax binary heap, used as the selection tree, in
- *         sorting with replacement. Check SortMinHeap for details on comparing
- *         elements.
- */
-public class SortMinMaxHeap implements ISelectionTree {
-    static final int RUN_ID_IX = 0;
-    static final int FRAME_IX = 1;
-    static final int OFFSET_IX = 2;
-    private static final int PNK_IX = 3;
-    private static final int NOT_EXIST = -1;
-    private static final int ELEMENT_SIZE = 4;
-    private static final int INIT_ARRAY_SIZE = 512;
-
-    private final int[] sortFields;
-    private final IBinaryComparator[] comparators;
-    private final RecordDescriptor recordDescriptor;
-    private final FrameTupleAccessor fta1;
-    private final FrameTupleAccessor fta2;
-
-    private int[] elements;
-    private int nextIx;
-
-    private final IMemoryManager memMgr;
-
-    public SortMinMaxHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDesc, IMemoryManager memMgr) {
-        this.sortFields = sortFields;
-        this.comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            this.comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-        this.recordDescriptor = recordDesc;
-        fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-        this.memMgr = memMgr;
-        this.elements = new int[INIT_ARRAY_SIZE];
-        Arrays.fill(elements, -1);
-        this.nextIx = 0;
-    }
-
-    @Override
-    public void insert(int[] element) throws HyracksDataException {
-        if (nextIx >= elements.length) {
-            elements = Arrays.copyOf(elements, elements.length * 2);
-        }
-        for (int i = 0; i < ELEMENT_SIZE; i++) {
-            elements[nextIx + i] = element[i];
-        }
-        nextIx += ELEMENT_SIZE;
-        bubbleUp(nextIx - ELEMENT_SIZE);
-    }
-
-    @Override
-    public void getMin(int[] result) throws HyracksDataException {
-        if (nextIx == 0) {
-            result[0] = result[1] = result[2] = result[3] = -1;
-            return;
-        }
-
-        int[] topElem = delete(0);
-        for (int x = 0; x < ELEMENT_SIZE; x++) {
-            result[x] = topElem[x];
-        }
-    }
-
-    @Override
-    public void reset() {
-        Arrays.fill(elements, -1);
-        nextIx = 0;
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return (nextIx < ELEMENT_SIZE);
-    }
-
-    @Override
-    public void peekMin(int[] result) {
-        if (nextIx == 0) {
-            result[0] = result[1] = result[2] = result[3] = -1;
-            return;
-        }
-
-        for (int x = 0; x < ELEMENT_SIZE; x++) {
-            result[x] = elements[x];
-        }
-    }
-
-    @Override
-    public void getMax(int[] result) throws HyracksDataException {
-        if (nextIx == ELEMENT_SIZE) {
-            int[] topElement = removeLast();
-            for (int x = 0; x < ELEMENT_SIZE; x++) {
-                result[x] = topElement[x];
-            }
-            return;
-        }
-
-        if (nextIx > ELEMENT_SIZE) {
-            int lc = getLeftChild(0);
-            int rc = getRightChild(0);
-            int maxIx = lc;
-
-            if (rc != -1) {
-                maxIx = compare(lc, rc) < 0 ? rc : lc;
-            }
-
-            int[] maxElem = delete(maxIx);
-            for (int x = 0; x < ELEMENT_SIZE; x++) {
-                result[x] = maxElem[x];
-            }
-            return;
-        }
-
-        result[0] = result[1] = result[2] = result[3] = -1;
-
-    }
-
-    @Override
-    public void peekMax(int[] result) throws HyracksDataException {
-        if (nextIx == ELEMENT_SIZE) {
-            for (int i = 0; i < ELEMENT_SIZE; i++) {
-                result[i] = elements[i];
-            }
-            return;
-        }
-        if (nextIx > ELEMENT_SIZE) {
-            int lc = getLeftChild(0);
-            int rc = getRightChild(0);
-            int maxIx = lc;
-
-            if (rc != -1) {
-                maxIx = compare(lc, rc) < 0 ? rc : lc;
-            }
-
-            for (int x = 0; x < ELEMENT_SIZE; x++) {
-                result[x] = elements[maxIx + x];
-            }
-
-            return;
-        }
-        result[0] = result[1] = result[2] = result[3] = -1;
-    }
-
-    private int[] delete(int delIx) throws HyracksDataException {
-        int s = nextIx;
-        if (nextIx > ELEMENT_SIZE) {
-            int[] delEntry = Arrays.copyOfRange(elements, delIx, delIx + ELEMENT_SIZE);
-            int[] last = removeLast();
-            if (delIx != (s - ELEMENT_SIZE)) {
-                for (int x = 0; x < ELEMENT_SIZE; x++) {
-                    elements[delIx + x] = last[x];
-                }
-                trickleDown(delIx);
-            }
-            return delEntry;
-        } else if (nextIx == ELEMENT_SIZE) {
-            return (removeLast());
-        }
-        return null;
-    }
-
-    private int[] removeLast() {
-        if (nextIx < ELEMENT_SIZE) { //this is the very last element
-            return new int[] { -1, -1, -1, -1 };
-        }
-        int[] l = Arrays.copyOfRange(elements, nextIx - ELEMENT_SIZE, nextIx);
-        Arrays.fill(elements, nextIx - ELEMENT_SIZE, nextIx, -1);
-        nextIx -= ELEMENT_SIZE;
-        return l;
-    }
-
-    private void bubbleUp(int ix) throws HyracksDataException {
-        int p = getParentIx(ix);
-        if (isAtMinLevel(ix)) {
-            if (p != NOT_EXIST && compare(p, ix) < 0) {
-                swap(ix, p);
-                bubbleUpMax(p);
-            } else {
-                bubbleUpMin(ix);
-            }
-        } else { // i is at max level
-            if (p != NOT_EXIST && compare(ix, p) < 0) {
-                swap(ix, p);
-                bubbleUpMin(p);
-            } else {
-                bubbleUpMax(ix);
-            }
-        }
-    }
-
-    private void bubbleUpMax(int ix) throws HyracksDataException {
-        int gp = getGrandParent(ix);
-        if (gp != NOT_EXIST && compare(gp, ix) < 0) {
-            swap(ix, gp);
-            bubbleUpMax(gp);
-        }
-    }
-
-    private void bubbleUpMin(int ix) throws HyracksDataException {
-        int gp = getGrandParent(ix);
-        if (gp != NOT_EXIST && compare(ix, gp) < 0) {
-            swap(ix, gp);
-            bubbleUpMin(gp);
-        }
-    }
-
-    private void trickleDown(int ix) throws HyracksDataException {
-        if (isAtMinLevel(ix)) {
-            trickleDownMin(ix);
-        } else {
-            trickleDownMax(ix);
-        }
-    }
-
-    private void trickleDownMax(int ix) throws HyracksDataException {
-        int maxIx = getMaxOfDescendents(ix);
-        if (maxIx == NOT_EXIST) {
-            return;
-        }
-        if (maxIx > getLeftChild(ix) && maxIx > getRightChild(ix)) { // A grand
-                                                                     // children
-            if (compare(ix, maxIx) < 0) {
-                swap(maxIx, ix);
-                int p = getParentIx(maxIx);
-                if (p != NOT_EXIST && compare(maxIx, p) < 0) {
-                    swap(maxIx, p);
-                }
-                trickleDownMax(maxIx);
-            }
-        } else { // A children
-            if (compare(ix, maxIx) < 0) {
-                swap(ix, maxIx);
-            }
-        }
-    }
-
-    private void trickleDownMin(int ix) throws HyracksDataException {
-        int minIx = getMinOfDescendents(ix);
-        if (minIx == NOT_EXIST) {
-            return;
-        }
-        if (minIx > getLeftChild(ix) && minIx > getRightChild(ix)) { // A grand
-                                                                     // children
-            if (compare(minIx, ix) < 0) {
-                swap(minIx, ix);
-                int p = getParentIx(minIx);
-                if (p != NOT_EXIST && compare(p, minIx) < 0) {
-                    swap(minIx, p);
-                }
-                trickleDownMin(minIx);
-            }
-        } else { // A children
-            if (compare(minIx, ix) < 0) {
-                swap(ix, minIx);
-            }
-        }
-    }
-
-    // Min among children and grand children
-    private int getMinOfDescendents(int ix) throws HyracksDataException {
-        int lc = getLeftChild(ix);
-        if (lc == NOT_EXIST) {
-            return NOT_EXIST;
-        }
-        int rc = getRightChild(ix);
-        if (rc == NOT_EXIST) {
-            return lc;
-        }
-        int min = (compare(lc, rc) < 0) ? lc : rc;
-        int[] lgc = getLeftGrandChildren(ix);
-        int[] rgc = getRightGrandChildren(ix);
-        for (int k = 0; k < 2; k++) {
-            if (lgc[k] != NOT_EXIST && compare(lgc[k], min) < 0) {
-                min = lgc[k];
-            }
-            if (rgc[k] != NOT_EXIST && compare(rgc[k], min) < 0) {
-                min = rgc[k];
-            }
-        }
-        return min;
-    }
-
-    // Max among children and grand children
-    private int getMaxOfDescendents(int ix) throws HyracksDataException {
-        int lc = getLeftChild(ix);
-        if (lc == NOT_EXIST) {
-            return NOT_EXIST;
-        }
-        int rc = getRightChild(ix);
-        if (rc == NOT_EXIST) {
-            return lc;
-        }
-        int max = (compare(lc, rc) < 0) ? rc : lc;
-        int[] lgc = getLeftGrandChildren(ix);
-        int[] rgc = getRightGrandChildren(ix);
-        for (int k = 0; k < 2; k++) {
-            if (lgc[k] != NOT_EXIST && compare(max, lgc[k]) < 0) {
-                max = lgc[k];
-            }
-            if (rgc[k] != NOT_EXIST && compare(max, rgc[k]) < 0) {
-                max = rgc[k];
-            }
-        }
-        return max;
-    }
-
-    private void swap(int n1Ix, int n2Ix) {
-        int[] temp = Arrays.copyOfRange(elements, n1Ix, n1Ix + ELEMENT_SIZE);
-        for (int i = 0; i < ELEMENT_SIZE; i++) {
-            elements[n1Ix + i] = elements[n2Ix + i];
-            elements[n2Ix + i] = temp[i];
-        }
-    }
-
-    private int getParentIx(int i) {
-        if (i < ELEMENT_SIZE) {
-            return NOT_EXIST;
-        }
-        return ((i - ELEMENT_SIZE) / (2 * ELEMENT_SIZE)) * ELEMENT_SIZE;
-    }
-
-    private int getGrandParent(int i) {
-        int p = getParentIx(i);
-        return p != -1 ? getParentIx(p) : NOT_EXIST;
-    }
-
-    private int getLeftChild(int i) {
-        int lc = (2 * ELEMENT_SIZE) * (i / ELEMENT_SIZE) + ELEMENT_SIZE;
-        return (lc < nextIx ? lc : -1);
-    }
-
-    private int[] getLeftGrandChildren(int i) {
-        int lc = getLeftChild(i);
-        return lc != NOT_EXIST ? new int[] { getLeftChild(lc), getRightChild(lc) } : new int[] { NOT_EXIST, NOT_EXIST };
-    }
-
-    private int getRightChild(int i) {
-        int rc = (2 * ELEMENT_SIZE) * (i / ELEMENT_SIZE) + (2 * ELEMENT_SIZE);
-        return (rc < nextIx ? rc : -1);
-    }
-
-    private int[] getRightGrandChildren(int i) {
-        int rc = getRightChild(i);
-        return rc != NOT_EXIST ? new int[] { getLeftChild(rc), getRightChild(rc) } : new int[] { NOT_EXIST, NOT_EXIST };
-    }
-
-    private boolean isAtMinLevel(int i) {
-        int l = getLevel(i);
-        return l % 2 == 0 ? true : false;
-    }
-
-    private int getLevel(int i) {
-        if (i < ELEMENT_SIZE) {
-            return 0;
-        }
-
-        int cnv = i / ELEMENT_SIZE;
-        int l = (int) Math.floor(Math.log(cnv) / Math.log(2));
-        if (cnv == (((int) Math.pow(2, (l + 1))) - 1)) {
-            return (l + 1);
-        }
-        return l;
-    }
-
-    private ByteBuffer getFrame(int frameIx) {
-        return (memMgr.getFrame(frameIx));
-    }
-
-    // first < sec : -1
-    private int compare(int nodeSIx1, int nodeSIx2) throws HyracksDataException {
-        int[] n1 = Arrays.copyOfRange(elements, nodeSIx1, nodeSIx1 + ELEMENT_SIZE); //tree.get(nodeSIx1);
-        int[] n2 = Arrays.copyOfRange(elements, nodeSIx2, nodeSIx2 + ELEMENT_SIZE); //tree.get(nodeSIx2);
-        return (compare(n1, n2));
-    }
-
-    // first < sec : -1
-    private int compare(int[] n1, int[] n2) throws HyracksDataException {
-        // Compare Run Numbers
-        if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
-            return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
-        }
-
-        // Compare Poor man Normalized Keys
-        if (n1[PNK_IX] != n2[PNK_IX]) {
-            return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1 : 1;
-        }
-
-        return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]), n1[OFFSET_IX], n2[OFFSET_IX]);
-    }
-
-    private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset, int r2StartOffset)
-            throws HyracksDataException {
-        byte[] b1 = fr1.array();
-        byte[] b2 = fr2.array();
-        fta1.reset(fr1);
-        fta2.reset(fr2);
-        int headerLen = BSTNodeUtil.HEADER_SIZE;
-        r1StartOffset += headerLen;
-        r2StartOffset += headerLen;
-        for (int f = 0; f < comparators.length; ++f) {
-            int fIdx = sortFields[f];
-            int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1) * 4);
-            int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
-            int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
-            int l1 = f1End - f1Start;
-            int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
-            int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
-            int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
-            int l2 = f2End - f2Start;
-
-            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
new file mode 100644
index 0000000..ee43993
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.value.*;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+
+import java.util.List;
+
+public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescriptor {
+
+    private final int topK;
+
+    public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) {
+        super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+        this.topK = topK;
+    }
+
+    @Override
+    public SortActivity getSortActivity(ActivityId id) {
+        return new SortActivity(id) {
+            @Override
+            protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+                    IRecordDescriptorProvider recordDescProvider) {
+                return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, firstKeyNormalizerFactory,
+                        comparatorFactories, recordDescriptors[0]);
+
+            }
+        };
+    }
+
+    @Override
+    public MergeActivity getMergeActivity(ActivityId id) {
+        return new MergeActivity(id) {
+            @Override
+            protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+                    IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter, List<RunAndMaxFrameSizePair> runs, IBinaryComparator[] comparators,
+                    INormalizedKeyComputer nmkComputer, int necessaryFrames) {
+                return new ExternalSortRunMerger(ctx, sorter, runs, sortFields, comparators,
+                        nmkComputer, recordDescriptors[0], necessaryFrames, topK, writer);
+            }
+        };
+    }
+}