You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:25 UTC

[08/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
deleted file mode 100644
index 110bddb..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
+++ /dev/null
@@ -1,717 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * @author pouria Implements Memory Manager based on creating Binary Search Tree
- *         (BST) while Free slot size is the key for the BST nodes. Each node in
- *         BST shows a class of free slots, while all the free slots within a
- *         class have same lengths. Slots in a class are stored as a LinkedList,
- *         whose head is the BST node, corresponding to that class. BST is not
- *         stored as a separate data structure, but the free slots in the memory
- *         are used to hold BST nodes. Each BST node has the logical structure,
- *         defined in the BSTNodeUtil class.
- */
-public class BSTMemMgr implements IMemoryManager {
-
-    private final IHyracksTaskContext ctx;
-    public static int frameSize;
-
-    private ByteBuffer[] frames;
-    private ByteBuffer convertBuffer;
-    private Slot root;
-    private Slot result; // A reusable object to hold one node returned as
-                         // method result
-    private Slot insertSlot; // A reusable object to hold one node within insert
-                             // process
-    private Slot lastLeftParent; // A reusable object for the search process
-    private Slot lastLeft; // A reusable object for the search process
-    private Slot parent; // A reusable object for the search process
-
-    private Slot[] parentRes;
-    private int lastFrame;
-
-    public BSTMemMgr(IHyracksTaskContext ctx, int memSize) {
-        this.ctx = ctx;
-        frameSize = ctx.getFrameSize();
-        convertBuffer = ByteBuffer.allocate(4);
-        frames = new ByteBuffer[memSize];
-        lastFrame = -1;
-        root = new Slot();
-        insertSlot = new Slot();
-        result = new Slot();
-        lastLeftParent = new Slot();
-        lastLeft = new Slot();
-        parent = new Slot();
-        parentRes = new Slot[] { new Slot(), new Slot() };
-    }
-
-    /**
-     * result is the container sent by the caller to hold the results
-     */
-    @Override
-    public void allocate(int length, Slot result) throws HyracksDataException {
-        search(length, parentRes);
-        if (parentRes[1].isNull()) {
-            addFrame(parentRes);
-            if (parentRes[1].isNull()) {
-                return;
-            }
-        }
-
-        int sl = BSTNodeUtil.getLength(parentRes[1], frames, convertBuffer);
-        int acLen = BSTNodeUtil.getActualLength(length);
-        if (shouldSplit(sl, acLen)) {
-            int[] s = split(parentRes[1], parentRes[0], acLen);
-            int insertLen = BSTNodeUtil.getLength(s[2], s[3], frames, convertBuffer);
-            insert(s[2], s[3], insertLen); // inserting second half of the split
-                                           // slot
-            BSTNodeUtil.setHeaderFooter(s[0], s[1], length, false, frames);
-            result.set(s[0], s[1]);
-            return;
-        }
-        allocate(parentRes[1], parentRes[0], length, result);
-    }
-
-    @Override
-    public int unallocate(Slot s) throws HyracksDataException {
-        int usedLen = BSTNodeUtil.getLength(s, frames, convertBuffer);
-        int actualLen = BSTNodeUtil.getActualLength(usedLen);
-        int fix = s.getFrameIx();
-        int off = s.getOffset();
-
-        int prevMemSlotFooterOffset = ((off - BSTNodeUtil.HEADER_SIZE) >= 0 ? (off - BSTNodeUtil.HEADER_SIZE)
-                : BSTNodeUtil.INVALID_INDEX);
-        int t = off + 2 * BSTNodeUtil.HEADER_SIZE + actualLen;
-        int nextMemSlotHeaderOffset = (t < frameSize ? t : BSTNodeUtil.INVALID_INDEX);
-        // Remember: next and prev memory slots have the same frame index as the
-        // unallocated slot
-        if (!isNodeNull(fix, prevMemSlotFooterOffset) && BSTNodeUtil.isFree(fix, prevMemSlotFooterOffset, frames)) {
-            int leftLength = BSTNodeUtil.getLength(fix, prevMemSlotFooterOffset, frames, convertBuffer);
-            removeFromList(fix, prevMemSlotFooterOffset - leftLength - BSTNodeUtil.HEADER_SIZE);
-            int concatLength = actualLen + leftLength + 2 * BSTNodeUtil.HEADER_SIZE;
-            if (!isNodeNull(fix, nextMemSlotHeaderOffset) && BSTNodeUtil.isFree(fix, nextMemSlotHeaderOffset, frames)) {
-                removeFromList(fix, nextMemSlotHeaderOffset);
-                concatLength += BSTNodeUtil.getLength(fix, nextMemSlotHeaderOffset, frames, convertBuffer) + 2
-                        * BSTNodeUtil.HEADER_SIZE;
-            }
-            insert(fix, prevMemSlotFooterOffset - leftLength - BSTNodeUtil.HEADER_SIZE, concatLength); // newly
-                                                                                                       // (merged)
-                                                                                                       // slot
-                                                                                                       // starts
-                                                                                                       // at
-                                                                                                       // the
-                                                                                                       // prev
-                                                                                                       // slot
-                                                                                                       // offset
-            return concatLength;
-
-        } else if (!isNodeNull(fix, nextMemSlotHeaderOffset)
-                && BSTNodeUtil.isFree(fix, nextMemSlotHeaderOffset, frames)) {
-            removeFromList(fix, nextMemSlotHeaderOffset);
-            int concatLength = actualLen + BSTNodeUtil.getLength(fix, nextMemSlotHeaderOffset, frames, convertBuffer)
-                    + 2 * BSTNodeUtil.HEADER_SIZE;
-            insert(fix, off, concatLength); // newly (merged) slot starts at the
-                                            // unallocated slot offset
-            return concatLength;
-        }
-        // unallocated slot is not merging with any neighbor
-        insert(fix, off, actualLen);
-        return actualLen;
-    }
-
-    @Override
-    public boolean readTuple(int frameIx, int offset, FrameTupleAppender dest) {
-        int offToRead = offset + BSTNodeUtil.HEADER_SIZE;
-        int length = BSTNodeUtil.getLength(frameIx, offset, frames, convertBuffer);
-        return dest.append(frames[frameIx].array(), offToRead, length);
-    }
-
-    @Override
-    public boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src, int tIndex) {
-        int offToCopy = offset + BSTNodeUtil.HEADER_SIZE;
-        int tStartOffset = src.getTupleStartOffset(tIndex);
-        int tEndOffset = src.getTupleEndOffset(tIndex);
-        int tupleLength = tEndOffset - tStartOffset;
-        ByteBuffer srcBuffer = src.getBuffer();
-        System.arraycopy(srcBuffer.array(), tStartOffset, frames[frameIx].array(), offToCopy, tupleLength);
-        return true;
-    }
-
-    @Override
-    public ByteBuffer getFrame(int frameIndex) {
-        return frames[frameIndex];
-    }
-
-    @Override
-    public void close() {
-        //clean up all frames
-        for (int i = 0; i < frames.length; i++)
-            frames[i] = null;
-    }
-
-    /**
-     * @param parentResult
-     *            is the container passed by the caller to contain the results
-     * @throws HyracksDataException
-     */
-    private void addFrame(Slot[] parentResult) throws HyracksDataException {
-        clear(parentResult);
-        if ((lastFrame + 1) >= frames.length) {
-            return;
-        }
-        frames[++lastFrame] = allocateFrame();
-        int l = frameSize - 2 * BSTNodeUtil.HEADER_SIZE;
-        BSTNodeUtil.setHeaderFooter(lastFrame, 0, l, true, frames);
-        initNewNode(lastFrame, 0);
-
-        parentResult[1].copy(root);
-        if (parentResult[1].isNull()) { // root is null
-            root.set(lastFrame, 0);
-            initNewNode(root.getFrameIx(), root.getOffset());
-            parentResult[1].copy(root);
-            return;
-        }
-
-        while (!parentResult[1].isNull()) {
-            if (BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer) == l) {
-                append(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0);
-                parentResult[1].set(lastFrame, 0);
-                return;
-            }
-            if (l < BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer)) {
-                if (isNodeNull(BSTNodeUtil.getLeftChildFrameIx(parentResult[1], frames, convertBuffer),
-                        BSTNodeUtil.getLeftChildOffset(parentResult[1], frames, convertBuffer))) {
-                    BSTNodeUtil.setLeftChild(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0,
-                            frames);
-                    parentResult[0].copy(parentResult[1]);
-                    parentResult[1].set(lastFrame, 0);
-                    return;
-                } else {
-                    parentResult[0].copy(parentResult[1]);
-                    parentResult[1].set(BSTNodeUtil.getLeftChildFrameIx(parentResult[1], frames, convertBuffer),
-                            BSTNodeUtil.getLeftChildOffset(parentResult[1], frames, convertBuffer));
-                }
-            } else {
-                if (isNodeNull(BSTNodeUtil.getRightChildFrameIx(parentResult[1], frames, convertBuffer),
-                        BSTNodeUtil.getRightChildOffset(parentResult[1], frames, convertBuffer))) {
-                    BSTNodeUtil.setRightChild(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0,
-                            frames);
-                    parentResult[0].copy(parentResult[1]);
-                    parentResult[1].set(lastFrame, 0);
-                    return;
-                } else {
-                    parentResult[0].copy(parentResult[1]);
-                    parentResult[1].set(BSTNodeUtil.getRightChildFrameIx(parentResult[1], frames, convertBuffer),
-                            BSTNodeUtil.getRightChildOffset(parentResult[1], frames, convertBuffer));
-                }
-            }
-        }
-        throw new HyracksDataException("New Frame could not be added to BSTMemMgr");
-    }
-
-    private void insert(int fix, int off, int length) throws HyracksDataException {
-        BSTNodeUtil.setHeaderFooter(fix, off, length, true, frames);
-        initNewNode(fix, off);
-
-        if (root.isNull()) {
-            root.set(fix, off);
-            return;
-        }
-
-        insertSlot.clear();
-        insertSlot.copy(root);
-        while (!insertSlot.isNull()) {
-            int curSlotLen = BSTNodeUtil.getLength(insertSlot, frames, convertBuffer);
-            if (curSlotLen == length) {
-                append(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off);
-                return;
-            }
-            if (length < curSlotLen) {
-                int leftChildFIx = BSTNodeUtil.getLeftChildFrameIx(insertSlot, frames, convertBuffer);
-                int leftChildOffset = BSTNodeUtil.getLeftChildOffset(insertSlot, frames, convertBuffer);
-                if (isNodeNull(leftChildFIx, leftChildOffset)) {
-                    initNewNode(fix, off);
-                    BSTNodeUtil.setLeftChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
-                    return;
-                } else {
-                    insertSlot.set(leftChildFIx, leftChildOffset);
-                }
-            } else {
-                int rightChildFIx = BSTNodeUtil.getRightChildFrameIx(insertSlot, frames, convertBuffer);
-                int rightChildOffset = BSTNodeUtil.getRightChildOffset(insertSlot, frames, convertBuffer);
-                if (isNodeNull(rightChildFIx, rightChildOffset)) {
-                    initNewNode(fix, off);
-                    BSTNodeUtil.setRightChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
-                    return;
-                } else {
-                    insertSlot.set(rightChildFIx, rightChildOffset);
-                }
-            }
-        }
-        throw new HyracksDataException("Failure in node insertion into BST in BSTMemMgr");
-    }
-
-    /**
-     * @param length
-     * @param target
-     *            is the container sent by the caller to hold the results
-     */
-    private void search(int length, Slot[] target) {
-        clear(target);
-        result.clear();
-
-        if (root.isNull()) {
-            return;
-        }
-
-        lastLeftParent.clear();
-        lastLeft.clear();
-        parent.clear();
-        result.copy(root);
-
-        while (!result.isNull()) {
-            if (BSTNodeUtil.getLength(result, frames, convertBuffer) == length) {
-                target[0].copy(parent);
-                target[1].copy(result);
-                return;
-            }
-            if (length < BSTNodeUtil.getLength(result, frames, convertBuffer)) {
-                lastLeftParent.copy(parent);
-                lastLeft.copy(result);
-                parent.copy(result);
-                int fix = BSTNodeUtil.getLeftChildFrameIx(result, frames, convertBuffer);
-                int off = BSTNodeUtil.getLeftChildOffset(result, frames, convertBuffer);
-                result.set(fix, off);
-            } else {
-                parent.copy(result);
-                int fix = BSTNodeUtil.getRightChildFrameIx(result, frames, convertBuffer);
-                int off = BSTNodeUtil.getRightChildOffset(result, frames, convertBuffer);
-                result.set(fix, off);
-            }
-        }
-
-        target[0].copy(lastLeftParent);
-        target[1].copy(lastLeft);
-
-    }
-
-    private void append(int headFix, int headOff, int nodeFix, int nodeOff) {
-        initNewNode(nodeFix, nodeOff);
-
-        int fix = BSTNodeUtil.getNextFrameIx(headFix, headOff, frames, convertBuffer); // frameIx
-        // for
-        // the
-        // current
-        // next
-        // of
-        // head
-        int off = BSTNodeUtil.getNextOffset(headFix, headOff, frames, convertBuffer); // offset
-                                                                                      // for
-                                                                                      // the
-                                                                                      // current
-                                                                                      // next
-                                                                                      // of
-                                                                                      // head
-        BSTNodeUtil.setNext(nodeFix, nodeOff, fix, off, frames);
-
-        if (!isNodeNull(fix, off)) {
-            BSTNodeUtil.setPrev(fix, off, nodeFix, nodeOff, frames);
-        }
-        BSTNodeUtil.setPrev(nodeFix, nodeOff, headFix, headOff, frames);
-        BSTNodeUtil.setNext(headFix, headOff, nodeFix, nodeOff, frames);
-    }
-
-    private int[] split(Slot listHead, Slot parent, int length) {
-        int l2 = BSTNodeUtil.getLength(listHead, frames, convertBuffer) - length - 2 * BSTNodeUtil.HEADER_SIZE;
-        // We split the node after slots-list head
-        if (!isNodeNull(BSTNodeUtil.getNextFrameIx(listHead, frames, convertBuffer),
-                BSTNodeUtil.getNextOffset(listHead, frames, convertBuffer))) {
-            int afterHeadFix = BSTNodeUtil.getNextFrameIx(listHead, frames, convertBuffer);
-            int afterHeadOff = BSTNodeUtil.getNextOffset(listHead, frames, convertBuffer);
-            int afHNextFix = BSTNodeUtil.getNextFrameIx(afterHeadFix, afterHeadOff, frames, convertBuffer);
-            int afHNextOff = BSTNodeUtil.getNextOffset(afterHeadFix, afterHeadOff, frames, convertBuffer);
-            BSTNodeUtil.setNext(listHead.getFrameIx(), listHead.getOffset(), afHNextFix, afHNextOff, frames);
-            if (!isNodeNull(afHNextFix, afHNextOff)) {
-                BSTNodeUtil.setPrev(afHNextFix, afHNextOff, listHead.getFrameIx(), listHead.getOffset(), frames);
-            }
-            int secondOffset = afterHeadOff + length + 2 * BSTNodeUtil.HEADER_SIZE;
-            BSTNodeUtil.setHeaderFooter(afterHeadFix, afterHeadOff, length, true, frames);
-            BSTNodeUtil.setHeaderFooter(afterHeadFix, secondOffset, l2, true, frames);
-
-            return new int[] { afterHeadFix, afterHeadOff, afterHeadFix, secondOffset };
-        }
-        // We split the head
-        int secondOffset = listHead.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE;
-        BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), listHead.getOffset(), length, true, frames);
-        BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), secondOffset, l2, true, frames);
-
-        fixTreePtrs(listHead.getFrameIx(), listHead.getOffset(), parent.getFrameIx(), parent.getOffset());
-        return new int[] { listHead.getFrameIx(), listHead.getOffset(), listHead.getFrameIx(), secondOffset };
-    }
-
-    private void fixTreePtrs(int nodeFrameIx, int nodeOffset, int parentFrameIx, int parentOffset) {
-        int nodeLeftChildFrameIx = BSTNodeUtil.getLeftChildFrameIx(nodeFrameIx, nodeOffset, frames, convertBuffer);
-        int nodeLeftChildOffset = BSTNodeUtil.getLeftChildOffset(nodeFrameIx, nodeOffset, frames, convertBuffer);
-        int nodeRightChildFrameIx = BSTNodeUtil.getRightChildFrameIx(nodeFrameIx, nodeOffset, frames, convertBuffer);
-        int nodeRightChildOffset = BSTNodeUtil.getRightChildOffset(nodeFrameIx, nodeOffset, frames, convertBuffer);
-
-        int status = -1; // (status==0 if node is left child of parent)
-                         // (status==1 if node is right child of parent)
-        if (!isNodeNull(parentFrameIx, parentOffset)) {
-            int nlen = BSTNodeUtil.getActualLength(BSTNodeUtil
-                    .getLength(nodeFrameIx, nodeOffset, frames, convertBuffer));
-            int plen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(parentFrameIx, parentOffset, frames,
-                    convertBuffer));
-            status = ((nlen < plen) ? 0 : 1);
-        }
-
-        if (!isNodeNull(nodeLeftChildFrameIx, nodeLeftChildOffset)
-                && !isNodeNull(nodeRightChildFrameIx, nodeRightChildOffset)) { // Node
-            // has
-            // two
-            // children
-            int pMinFIx = nodeFrameIx;
-            int pMinOff = nodeOffset;
-            int minFIx = nodeRightChildFrameIx;
-            int minOff = nodeRightChildOffset;
-            int nextLeftFIx = BSTNodeUtil.getLeftChildFrameIx(minFIx, minOff, frames, convertBuffer);
-            int nextLeftOff = BSTNodeUtil.getLeftChildOffset(minFIx, minOff, frames, convertBuffer);
-
-            while (!isNodeNull(nextLeftFIx, nextLeftOff)) {
-                pMinFIx = minFIx;
-                pMinOff = minOff;
-                minFIx = nextLeftFIx;
-                minOff = nextLeftOff;
-                nextLeftFIx = BSTNodeUtil.getLeftChildFrameIx(minFIx, minOff, frames, convertBuffer); // min
-                                                                                                      // is
-                                                                                                      // now
-                                                                                                      // pointing
-                                                                                                      // to
-                                                                                                      // current
-                                                                                                      // (old)
-                                                                                                      // next
-                                                                                                      // left
-                nextLeftOff = BSTNodeUtil.getLeftChildOffset(minFIx, minOff, frames, convertBuffer); // min
-                                                                                                     // is
-                                                                                                     // now
-                                                                                                     // pointing
-                                                                                                     // to
-                                                                                                     // current
-                                                                                                     // (old)
-                                                                                                     // next
-                                                                                                     // left
-            }
-
-            if ((nodeRightChildFrameIx == minFIx) && (nodeRightChildOffset == minOff)) { // nrc
-                                                                                         // is
-                                                                                         // the
-                // same as min
-                BSTNodeUtil.setLeftChild(nodeRightChildFrameIx, nodeRightChildOffset, nodeLeftChildFrameIx,
-                        nodeLeftChildOffset, frames);
-            } else { // min is different from nrc
-                int minRightFIx = BSTNodeUtil.getRightChildFrameIx(minFIx, minOff, frames, convertBuffer);
-                int minRightOffset = BSTNodeUtil.getRightChildOffset(minFIx, minOff, frames, convertBuffer);
-                BSTNodeUtil.setRightChild(minFIx, minOff, nodeRightChildFrameIx, nodeRightChildOffset, frames);
-                BSTNodeUtil.setLeftChild(minFIx, minOff, nodeLeftChildFrameIx, nodeLeftChildOffset, frames);
-                BSTNodeUtil.setLeftChild(pMinFIx, pMinOff, minRightFIx, minRightOffset, frames);
-            }
-
-            // Now dealing with the parent
-            if (!isNodeNull(parentFrameIx, parentOffset)) {
-                if (status == 0) {
-                    BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, minFIx, minOff, frames);
-                } else if (status == 1) {
-                    BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, minFIx, minOff, frames);
-                }
-            } else { // No parent (node was the root)
-                root.set(minFIx, minOff);
-            }
-            return;
-        }
-
-        else if (!isNodeNull(nodeLeftChildFrameIx, nodeLeftChildOffset)) { // Node
-                                                                           // has
-                                                                           // only
-                                                                           // left
-                                                                           // child
-            if (status == 0) {
-                BSTNodeUtil
-                        .setLeftChild(parentFrameIx, parentOffset, nodeLeftChildFrameIx, nodeLeftChildOffset, frames);
-            } else if (status == 1) {
-                BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, nodeLeftChildFrameIx, nodeLeftChildOffset,
-                        frames);
-            } else if (status == -1) { // No parent, so node is root
-                root.set(nodeLeftChildFrameIx, nodeLeftChildOffset);
-            }
-            return;
-        }
-
-        else if (!isNodeNull(nodeRightChildFrameIx, nodeRightChildOffset)) { // Node
-                                                                             // has
-                                                                             // only
-                                                                             // right
-                                                                             // child
-            if (status == 0) {
-                BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, nodeRightChildFrameIx, nodeRightChildOffset,
-                        frames);
-            } else if (status == 1) {
-                BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, nodeRightChildFrameIx, nodeRightChildOffset,
-                        frames);
-            } else if (status == -1) { // No parent, so node is root
-                root.set(nodeRightChildFrameIx, nodeRightChildOffset);
-            }
-            return;
-        }
-
-        else { // Node is leaf (no children)
-            if (status == 0) {
-                BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, BSTNodeUtil.INVALID_INDEX,
-                        BSTNodeUtil.INVALID_INDEX, frames);
-            } else if (status == 1) {
-                BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, BSTNodeUtil.INVALID_INDEX,
-                        BSTNodeUtil.INVALID_INDEX, frames);
-            } else { // node was the only node in the tree
-                root.clear();
-            }
-            return;
-        }
-    }
-
-    /**
-     * Allocation with no splitting but padding
-     * 
-     * @param node
-     * @param parent
-     * @param result
-     *            is the container sent by the caller to hold the results
-     */
-    private void allocate(Slot node, Slot parent, int length, Slot result) {
-        int nextFix = BSTNodeUtil.getNextFrameIx(node, frames, convertBuffer);
-        int nextOff = BSTNodeUtil.getNextOffset(node, frames, convertBuffer);
-        if (!isNodeNull(nextFix, nextOff)) {
-            int nextOfNextFIx = BSTNodeUtil.getNextFrameIx(nextFix, nextOff, frames, convertBuffer);
-            int nextOfNextOffset = BSTNodeUtil.getNextOffset(nextFix, nextOff, frames, convertBuffer);
-            BSTNodeUtil.setNext(node.getFrameIx(), node.getOffset(), nextOfNextFIx, nextOfNextOffset, frames);
-            if (!isNodeNull(nextOfNextFIx, nextOfNextOffset)) {
-                BSTNodeUtil.setPrev(nextOfNextFIx, nextOfNextOffset, node.getFrameIx(), node.getOffset(), frames);
-            }
-            BSTNodeUtil.setHeaderFooter(nextFix, nextOff, length, false, frames);
-            result.set(nextFix, nextOff);
-            return;
-        }
-
-        fixTreePtrs(node.getFrameIx(), node.getOffset(), parent.getFrameIx(), parent.getOffset());
-        BSTNodeUtil.setHeaderFooter(node.getFrameIx(), node.getOffset(), length, false, frames);
-        result.copy(node);
-    }
-
-    private void removeFromList(int fix, int off) {
-        int nextFIx = BSTNodeUtil.getNextFrameIx(fix, off, frames, convertBuffer);
-        int nextOffset = BSTNodeUtil.getNextOffset(fix, off, frames, convertBuffer);
-        int prevFIx = BSTNodeUtil.getPrevFrameIx(fix, off, frames, convertBuffer);
-        int prevOffset = BSTNodeUtil.getPrevOffset(fix, off, frames, convertBuffer);
-        if (!isNodeNull(prevFIx, prevOffset) && !isNodeNull(nextFIx, nextOffset)) {
-            BSTNodeUtil.setNext(prevFIx, prevOffset, nextFIx, nextOffset, frames);
-            BSTNodeUtil.setPrev(nextFIx, nextOffset, prevFIx, prevOffset, frames);
-            BSTNodeUtil.setNext(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
-            BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
-            return;
-        }
-        if (!isNodeNull(prevFIx, prevOffset)) {
-            BSTNodeUtil.setNext(prevFIx, prevOffset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
-            BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
-            return;
-        }
-
-        // We need to find the parent, so we can fix the tree
-        int parentFIx = BSTNodeUtil.INVALID_INDEX;
-        int parentOffset = BSTNodeUtil.INVALID_INDEX;
-        int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(fix, off, frames, convertBuffer));
-        fix = root.getFrameIx();
-        off = root.getOffset();
-        int curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
-        while (length != curLen) {
-            parentFIx = fix;
-            parentOffset = off;
-            if (length < curLen) {
-                fix = BSTNodeUtil.getLeftChildFrameIx(parentFIx, parentOffset, frames, convertBuffer); // parentFIx
-                // is
-                // now
-                // the
-                // old(current)
-                // fix
-                off = BSTNodeUtil.getLeftChildOffset(parentFIx, parentOffset, frames, convertBuffer); // parentOffset
-                // is
-                // now
-                // the
-                // old(current)
-                // off
-            } else {
-                fix = BSTNodeUtil.getRightChildFrameIx(parentFIx, parentOffset, frames, convertBuffer); // parentFIx
-                // is
-                // now
-                // the
-                // old(current)
-                // fix
-                off = BSTNodeUtil.getRightChildOffset(parentFIx, parentOffset, frames, convertBuffer); // parentOffset
-                // is
-                // now
-                // the
-                // old(current)
-                // off
-            }
-            curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
-        }
-
-        if (!isNodeNull(nextFIx, nextOffset)) { // it is head of the list (in
-                                                // the
-            // tree)
-            BSTNodeUtil.setPrev(nextFIx, nextOffset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
-            int nodeLeftChildFIx = BSTNodeUtil.getLeftChildFrameIx(fix, off, frames, convertBuffer);
-            int nodeLeftChildOffset = BSTNodeUtil.getLeftChildOffset(fix, off, frames, convertBuffer);
-            int nodeRightChildFix = BSTNodeUtil.getRightChildFrameIx(fix, off, frames, convertBuffer);
-            int nodeRightChildOffset = BSTNodeUtil.getRightChildOffset(fix, off, frames, convertBuffer);
-            BSTNodeUtil.setLeftChild(nextFIx, nextOffset, nodeLeftChildFIx, nodeLeftChildOffset, frames);
-            BSTNodeUtil.setRightChild(nextFIx, nextOffset, nodeRightChildFix, nodeRightChildOffset, frames);
-            if (!isNodeNull(parentFIx, parentOffset)) {
-                int parentLength = BSTNodeUtil.getLength(parentFIx, parentOffset, frames, convertBuffer);
-                if (length < parentLength) {
-                    BSTNodeUtil.setLeftChild(parentFIx, parentOffset, nextFIx, nextOffset, frames);
-                } else {
-                    BSTNodeUtil.setRightChild(parentFIx, parentOffset, nextFIx, nextOffset, frames);
-                }
-            }
-
-            if ((root.getFrameIx() == fix) && (root.getOffset() == off)) {
-                root.set(nextFIx, nextOffset);
-            }
-
-            return;
-        }
-
-        fixTreePtrs(fix, off, parentFIx, parentOffset);
-    }
-
-    private void clear(Slot[] s) {
-        s[0].clear();
-        s[1].clear();
-    }
-
-    private boolean isNodeNull(int frameIx, int offset) {
-        return ((frameIx == BSTNodeUtil.INVALID_INDEX) || (offset == BSTNodeUtil.INVALID_INDEX) || (frames[frameIx] == null));
-    }
-
-    private boolean shouldSplit(int slotLength, int reqLength) {
-        return ((slotLength - reqLength) >= BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE);
-    }
-
-    private void initNewNode(int frameIx, int offset) {
-        BSTNodeUtil.setLeftChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
-        BSTNodeUtil.setRightChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
-        BSTNodeUtil.setNext(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
-        BSTNodeUtil.setPrev(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
-    }
-
-    private ByteBuffer allocateFrame() throws HyracksDataException {
-        return ctx.allocateFrame();
-    }
-
-    public String debugPrintMemory() {
-        Slot s = new Slot(0, 0);
-        if (s.isNull()) {
-            return "memory:\tNull";
-        }
-
-        String m = "memory:\n" + debugPrintSlot(0, 0) + "\n";
-        int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(0, 0, frames, convertBuffer));
-        int noff = (length + 2 * BSTNodeUtil.HEADER_SIZE >= frameSize ? BSTNodeUtil.INVALID_INDEX : length + 2
-                * BSTNodeUtil.HEADER_SIZE);
-        int nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length == 1) ? BSTNodeUtil.INVALID_INDEX : 1) : 0);
-        if (noff == BSTNodeUtil.INVALID_INDEX && nfix != BSTNodeUtil.INVALID_INDEX) {
-            noff = 0;
-        }
-        s.set(nfix, noff);
-        while (!isNodeNull(s.getFrameIx(), s.getOffset())) {
-            m += debugPrintSlot(s.getFrameIx(), s.getOffset()) + "\n";
-            length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(s.getFrameIx(), s.getOffset(), frames,
-                    convertBuffer));
-            noff = (s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE >= frameSize ? BSTNodeUtil.INVALID_INDEX : s
-                    .getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE);
-            nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length - 1 == s.getFrameIx()) ? BSTNodeUtil.INVALID_INDEX
-                    : s.getFrameIx() + 1)
-                    : s.getFrameIx());
-            if (noff == BSTNodeUtil.INVALID_INDEX && nfix != BSTNodeUtil.INVALID_INDEX) {
-                noff = 0;
-            }
-            s.set(nfix, noff);
-        }
-        return m;
-    }
-
-    public String debugPrintTree() {
-        Slot node = new Slot();
-        node.copy(root);
-        if (!node.isNull()) {
-            return debugPrintSubTree(node);
-        }
-        return "Null";
-    }
-
-    private String debugPrintSubTree(Slot r) {
-        Slot node = new Slot();
-        node.copy(r);
-        int fix = node.getFrameIx();
-        int off = node.getOffset();
-        int lfix = BSTNodeUtil.getLeftChildFrameIx(node, frames, convertBuffer);
-        int loff = BSTNodeUtil.getLeftChildOffset(node, frames, convertBuffer);
-        int rfix = BSTNodeUtil.getRightChildFrameIx(node, frames, convertBuffer);
-        int roff = BSTNodeUtil.getRightChildOffset(node, frames, convertBuffer);
-        int nfix = BSTNodeUtil.getNextFrameIx(node, frames, convertBuffer);
-        int noff = BSTNodeUtil.getNextOffset(node, frames, convertBuffer);
-        int pfix = BSTNodeUtil.getPrevFrameIx(node, frames, convertBuffer);
-        int poff = BSTNodeUtil.getPrevOffset(node, frames, convertBuffer);
-
-        String s = "{" + r.getFrameIx() + ", " + r.getOffset() + " (Len: "
-                + BSTNodeUtil.getLength(fix, off, frames, convertBuffer) + ") - " + "(LC: "
-                + debugPrintSlot(lfix, loff) + ") - " + "(RC: " + debugPrintSlot(rfix, roff) + ") - " + "(NX: "
-                + debugPrintSlot(nfix, noff) + ") - " + "(PR: " + debugPrintSlot(pfix, poff) + ")  }\n";
-        if (!isNodeNull(lfix, loff)) {
-            s += debugPrintSubTree(new Slot(lfix, loff)) + "\n";
-        }
-        if (!isNodeNull(rfix, roff)) {
-            s += debugPrintSubTree(new Slot(rfix, roff)) + "\n";
-        }
-
-        return s;
-    }
-
-    private String debugPrintSlot(int fix, int off) {
-        if (isNodeNull(fix, off)) {
-            return BSTNodeUtil.INVALID_INDEX + ", " + BSTNodeUtil.INVALID_INDEX;
-        }
-        int l = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
-        int al = BSTNodeUtil.getActualLength(l);
-        boolean f = BSTNodeUtil.isFree(fix, off, frames);
-        return fix + ", " + off + " (free: " + f + ") (Len: " + l + ") (actual len: " + al + ") ";
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
deleted file mode 100644
index 8a1bcd3..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-
-/**
- * @author pouria
- *         Implements utility methods, used extensively and repeatedly within
- *         the BSTMemMgr.
- *         Mainly includes methods to set/get different types of pointers,
- *         required and accessed within BST traversal, along with the methods
- *         for setting/getting length/header/footer of free slots, which have
- *         been used as the containers for BST nodes.
- */
-public class BSTNodeUtil {
-
-    static final int MINIMUM_FREE_SLOT_SIZE = 32;
-
-    private static final int FRAME_PTR_SIZE = 4;
-    private static final int OFFSET_SIZE = 2;
-
-    static final int HEADER_SIZE = 2;
-    private static final int HEADER_INDEX = 0;
-
-    private static final int LEFT_CHILD_FRAME_INDEX = HEADER_INDEX + HEADER_SIZE;
-    private static final int LEFT_CHILD_OFFSET_INDEX = LEFT_CHILD_FRAME_INDEX + FRAME_PTR_SIZE;
-
-    private static final int RIGHT_CHILD_FRAME_INDEX = LEFT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
-    private static final int RIGHT_CHILD_OFFSET_INDEX = RIGHT_CHILD_FRAME_INDEX + FRAME_PTR_SIZE;
-
-    private static final int NEXT_FRAME_INDEX = RIGHT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
-    private static final int NEXT_OFFSET_INDEX = NEXT_FRAME_INDEX + FRAME_PTR_SIZE;
-
-    private static final int PREV_FRAME_INDEX = NEXT_OFFSET_INDEX + OFFSET_SIZE;
-    private static final int PREV_OFFSET_INDEX = PREV_FRAME_INDEX + FRAME_PTR_SIZE;
-
-    private static final byte INVALID = -128;
-    private static final byte MASK = 127;
-    static final int INVALID_INDEX = -1;
-
-    /*
-     * Structure of a free slot:
-     * [HEADER][LEFT_CHILD][RIGHT_CHILD][NEXT][PREV]...[FOOTER] MSB in the
-     * HEADER is set to 1 in a free slot
-     * 
-     * Structure of a used slot: [HEADER]...[FOOTER] MSB in the HEADER is set to
-     * 0 in a used slot
-     */
-
-    static int getLeftChildFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return getLeftChildFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-    }
-
-    static int getLeftChildOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return getLeftChildOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-    }
-
-    static int getLeftChildFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
-
-    }
-
-    static int getLeftChildOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
-    }
-
-    static void setLeftChild(Slot node, Slot lc, ByteBuffer[] frames) {
-        setLeftChild(node.getFrameIx(), node.getOffset(), lc.getFrameIx(), lc.getOffset(), frames);
-    }
-
-    static void setLeftChild(int nodeFix, int nodeOff, int lcFix, int lcOff, ByteBuffer[] frames) {
-        storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, lcFix);
-        storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, lcOff);
-    }
-
-    static int getRightChildFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return getRightChildFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-    }
-
-    static int getRightChildOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return getRightChildOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-    }
-
-    static int getRightChildFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return (retrieveAsInt(frames[frameIx], offset + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
-    }
-
-    static int getRightChildOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return (retrieveAsInt(frames[frameIx], offset + RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
-    }
-
-    static void setRightChild(Slot node, Slot rc, ByteBuffer[] frames) {
-        setRightChild(node.getFrameIx(), node.getOffset(), rc.getFrameIx(), rc.getOffset(), frames);
-    }
-
-    static void setRightChild(int nodeFix, int nodeOff, int rcFix, int rcOff, ByteBuffer[] frames) {
-        storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, rcFix);
-        storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, rcOff);
-    }
-
-    static int getNextFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return getNextFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-    }
-
-    static int getNextOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return getNextOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-    }
-
-    static int getNextFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return (retrieveAsInt(frames[frameIx], offset + NEXT_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
-    }
-
-    static int getNextOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return (retrieveAsInt(frames[frameIx], offset + NEXT_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
-    }
-
-    static void setNext(Slot node, Slot next, ByteBuffer[] frames) {
-        setNext(node.getFrameIx(), node.getOffset(), next.getFrameIx(), node.getOffset(), frames);
-    }
-
-    static void setNext(int nodeFix, int nodeOff, int nFix, int nOff, ByteBuffer[] frames) {
-        storeInt(frames[nodeFix], nodeOff + NEXT_FRAME_INDEX, FRAME_PTR_SIZE, nFix);
-        storeInt(frames[nodeFix], nodeOff + NEXT_OFFSET_INDEX, OFFSET_SIZE, nOff);
-    }
-
-    static int getPrevFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return getPrevFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-    }
-
-    static int getPrevOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return getPrevOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-    }
-
-    static int getPrevFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return (retrieveAsInt(frames[frameIx], offset + PREV_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
-    }
-
-    static int getPrevOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return (retrieveAsInt(frames[frameIx], offset + PREV_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
-    }
-
-    static void setPrev(Slot node, Slot prev, ByteBuffer[] frames) {
-        setPrev(node.getFrameIx(), node.getOffset(), prev.getFrameIx(), prev.getOffset(), frames);
-    }
-
-    static void setPrev(int nodeFix, int nodeOff, int pFix, int pOff, ByteBuffer[] frames) {
-        storeInt(frames[nodeFix], nodeOff + PREV_FRAME_INDEX, FRAME_PTR_SIZE, pFix);
-        storeInt(frames[nodeFix], nodeOff + PREV_OFFSET_INDEX, OFFSET_SIZE, pOff);
-    }
-
-    static boolean slotsTheSame(Slot s, Slot t) {
-        return ((s.getFrameIx() == t.getFrameIx()) && (s.getOffset() == t.getOffset()));
-    }
-
-    static void setHeaderFooter(int frameIx, int offset, int usedLength, boolean isFree, ByteBuffer[] frames) {
-        int slotLength = getActualLength(usedLength);
-        int footerOffset = offset + HEADER_SIZE + slotLength;
-        storeInt(frames[frameIx], offset, HEADER_SIZE, usedLength);
-        storeInt(frames[frameIx], footerOffset, HEADER_SIZE, usedLength);
-        setFree(frameIx, offset, isFree, frames);
-        setFree(frameIx, footerOffset, isFree, frames);
-    }
-
-    static int getLength(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        return getLength(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
-    }
-
-    static int getLength(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
-        convertBuffer.clear();
-        for (int i = 0; i < 4 - HEADER_SIZE; i++) { // padding
-            convertBuffer.put(i, (byte) 0x00);
-        }
-
-        convertBuffer.put(4 - HEADER_SIZE, (byte) ((frames[frameIx].get(offset)) & (MASK)));
-        System.arraycopy(frames[frameIx].array(), offset + 1, convertBuffer.array(), 5 - HEADER_SIZE, HEADER_SIZE - 1);
-        return convertBuffer.getInt(0);
-    }
-
-    // MSB equal to 1 means FREE
-    static boolean isFree(int frameIx, int offset, ByteBuffer[] frames) {
-        return ((((frames[frameIx]).array()[offset]) & 0x80) == 0x80);
-    }
-
-    static void setFree(int frameIx, int offset, boolean free, ByteBuffer[] frames) {
-        if (free) { // set MSB to 1 (for free)
-            frames[frameIx].put(offset, (byte) (((frames[frameIx]).array()[offset]) | 0x80));
-        } else { // set MSB to 0 (for used)
-            frames[frameIx].put(offset, (byte) (((frames[frameIx]).array()[offset]) & 0x7F));
-        }
-    }
-
-    static int getActualLength(int l) {
-        int r = (l + 2 * HEADER_SIZE) % MINIMUM_FREE_SLOT_SIZE;
-        return (r == 0 ? l : (l + (BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE - r)));
-    }
-
-    private static int retrieveAsInt(ByteBuffer b, int fromIndex, int size, ByteBuffer convertBuffer) {
-        if ((b.get(fromIndex) & INVALID) == INVALID) {
-            return INVALID_INDEX;
-        }
-
-        convertBuffer.clear();
-        for (int i = 0; i < 4 - size; i++) { // padding
-            convertBuffer.put(i, (byte) 0x00);
-        }
-
-        System.arraycopy(b.array(), fromIndex, convertBuffer.array(), 4 - size, size);
-        return convertBuffer.getInt(0);
-    }
-
-    private static void storeInt(ByteBuffer b, int fromIndex, int size, int value) {
-        if (value == INVALID_INDEX) {
-            b.put(fromIndex, INVALID);
-            return;
-        }
-        for (int i = 0; i < size; i++) {
-            b.put(fromIndex + i, (byte) ((value >>> (8 * ((size - 1 - i)))) & 0xff));
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index e1315e7..3dc8b41 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -1,64 +1,42 @@
 /*
  * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.*;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
 
-public class ExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
+import java.util.List;
 
-    private static final int SORT_ACTIVITY_ID = 0;
-    private static final int MERGE_ACTIVITY_ID = 1;
+public class ExternalSortOperatorDescriptor extends AbstractSorterOperatorDescriptor {
 
-    private final int[] sortFields;
-    private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final int framesLimit;
+    private static final long serialVersionUID = 1L;
 
     private Algorithm alg = Algorithm.MERGE_SORT;
+    private EnumFreeSlotPolicy policy = EnumFreeSlotPolicy.LAST_FIT;
+    private final int outputLimit;
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, Algorithm alg) {
-        this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
-        this.alg = alg;
+        this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, alg,
+                EnumFreeSlotPolicy.LAST_FIT);
     }
 
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
@@ -69,127 +47,52 @@ public class ExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
     public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor) {
-        super(spec, 1, 1);
-        this.framesLimit = framesLimit;
-        this.sortFields = sortFields;
-        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
-        this.comparatorFactories = comparatorFactories;
-        if (framesLimit <= 1) {
-            throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
-        }
-        recordDescriptors[0] = recordDescriptor;
+        this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+                Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT);
     }
 
     @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
-        MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
-
-        builder.addActivity(this, sa);
-        builder.addSourceEdge(0, sa, 0);
-
-        builder.addActivity(this, ma);
-        builder.addTargetEdge(0, ma, 0);
-
-        builder.addBlockingEdge(sa, ma);
+    public AbstractSorterOperatorDescriptor.SortActivity getSortActivity(ActivityId id) {
+        return new AbstractSorterOperatorDescriptor.SortActivity(id) {
+            @Override
+            protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+                    IRecordDescriptorProvider recordDescProvider) throws HyracksDataException {
+                return new ExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
+                        comparatorFactories, recordDescriptors[0], alg, policy, framesLimit, outputLimit);
+            }
+        };
     }
 
-    public static class SortTaskState extends AbstractStateObject {
-        private List<IFrameReader> runs;
-        private IFrameSorter frameSorter;
-
-        public SortTaskState() {
-        }
-
-        private SortTaskState(JobId jobId, TaskId taskId) {
-            super(jobId, taskId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
+    @Override
+    public AbstractSorterOperatorDescriptor.MergeActivity getMergeActivity(ActivityId id) {
+        return new AbstractSorterOperatorDescriptor.MergeActivity(id) {
+            @Override
+            protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+                    IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter, List<RunAndMaxFrameSizePair> runs, IBinaryComparator[] comparators,
+                    INormalizedKeyComputer nmkComputer, int necessaryFrames) {
+                return new ExternalSortRunMerger(ctx, sorter, runs, sortFields, comparators,
+                        nmkComputer, recordDescriptors[0], necessaryFrames, outputLimit, writer);
+            }
+        };
     }
 
-    private class SortActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public SortActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private ExternalSortRunGenerator runGen;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    runGen = new ExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
-                            comparatorFactories, recordDescriptors[0], alg, framesLimit);
-                    runGen.open();
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    runGen.nextFrame(buffer);
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    SortTaskState state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(
-                            getActivityId(), partition));
-                    runGen.close();
-                    state.runs = runGen.getRuns();
-                    state.frameSorter = runGen.getFrameSorter();
-                    ctx.setStateObject(state);
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    runGen.fail();
-                }
-            };
-            return op;
-        }
+    public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy) {
+        this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, alg,
+                policy, Integer.MAX_VALUE);
     }
 
-    private class MergeActivity extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public MergeActivity(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
-                @Override
-                public void initialize() throws HyracksDataException {
-                    SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
-                            SORT_ACTIVITY_ID), partition));
-                    List<IFrameReader> runs = state.runs;
-                    IFrameSorter frameSorter = state.frameSorter;
-                    IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-                    for (int i = 0; i < comparatorFactories.length; ++i) {
-                        comparators[i] = comparatorFactories[i].createBinaryComparator();
-                    }
-                    INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
-                            : firstKeyNormalizerFactory.createNormalizedKeyComputer();
-                    int necessaryFrames = Math.min(runs.size() + 2, framesLimit);
-                    ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
-                            comparators, nmkComputer, recordDescriptors[0], necessaryFrames, writer);
-                    merger.process();
-                }
-            };
-            return op;
+    public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy, int outputLimit) {
+        super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+        if (framesLimit <= 1) {
+            throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
         }
+        this.alg = alg;
+        this.policy = policy;
+        this.outputLimit = outputLimit;
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index 3736fca..4048726 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -1,24 +1,21 @@
 /*
  * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -27,77 +24,89 @@ import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotSmallestFit;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotBiggestFirst;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameFreeSlotPolicy;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
 
-public class ExternalSortRunGenerator implements IFrameWriter {
-    private final IHyracksTaskContext ctx;
-    private final IFrameSorter frameSorter;
-    private final List<IFrameReader> runs;
-    private final int maxSortFrames;
+public class ExternalSortRunGenerator extends AbstractSortRunGenerator {
+
+    protected final IHyracksTaskContext ctx;
+    protected final IFrameSorter frameSorter;
+    protected final int maxSortFrames;
 
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, int framesLimit) throws HyracksDataException {
+        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg,
+                EnumFreeSlotPolicy.LAST_FIT, framesLimit);
+    }
+
+    public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit)
+            throws HyracksDataException {
+        this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit,
+                Integer.MAX_VALUE);
+    }
+
+    public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit, int outputLimit)
+            throws HyracksDataException {
         this.ctx = ctx;
+        maxSortFrames = framesLimit - 1;
+
+        IFrameFreeSlotPolicy freeSlotPolicy = null;
+        switch (policy) {
+            case SMALLEST_FIT:
+                freeSlotPolicy = new FrameFreeSlotSmallestFit();
+                break;
+            case LAST_FIT:
+                freeSlotPolicy = new FrameFreeSlotLastFit(maxSortFrames);
+                break;
+            case BIGGEST_FIT:
+                freeSlotPolicy = new FrameFreeSlotBiggestFirst(maxSortFrames);
+                break;
+        }
+        IFrameBufferManager bufferManager = new VariableFrameMemoryManager(
+                new VariableFramePool(ctx, maxSortFrames * ctx.getInitialFrameSize()), freeSlotPolicy);
         if (alg == Algorithm.MERGE_SORT) {
-            frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
-                    recordDesc);
+            frameSorter = new FrameSorterMergeSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory,
+                    comparatorFactories, recordDesc, outputLimit);
         } else {
-            frameSorter = new FrameSorterQuickSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
-                    recordDesc);
+            frameSorter = new FrameSorterQuickSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory,
+                    comparatorFactories, recordDesc, outputLimit);
         }
-        runs = new LinkedList<IFrameReader>();
-        maxSortFrames = framesLimit - 1;
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        runs.clear();
-        frameSorter.reset();
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        if (frameSorter.getFrameCount() >= maxSortFrames) {
+        if (!frameSorter.insertFrame(buffer)) {
             flushFramesToRun();
-        }
-        frameSorter.insertFrame(buffer);
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (frameSorter.getFrameCount() > 0) {
-            if (runs.size() <= 0) {
-                frameSorter.sortFrames();
-            } else {
-                flushFramesToRun();
+            if (!frameSorter.insertFrame(buffer)) {
+                throw new HyracksDataException("The given frame is too big to insert into the sorting memory.");
             }
         }
     }
 
-    private void flushFramesToRun() throws HyracksDataException {
-        frameSorter.sortFrames();
+    protected RunFileWriter getRunFileWriter() throws HyracksDataException {
         FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
                 ExternalSortRunGenerator.class.getSimpleName());
-        RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());
-        writer.open();
-        try {
-            frameSorter.flushFrames(writer);
-        } finally {
-            writer.close();
-        }
-        frameSorter.reset();
-        runs.add(writer.createReader());
+        return new RunFileWriter(file, ctx.getIOManager());
     }
 
-    @Override
-    public void fail() throws HyracksDataException {
+    protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException {
+        return writer;
     }
 
-    public IFrameSorter getFrameSorter() {
+    @Override
+    public ISorter getSorter() {
         return frameSorter;
     }
 
-    public List<IFrameReader> getRuns() {
-        return runs;
-    }
 }
\ No newline at end of file