You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:25 UTC
[08/14] incubator-asterixdb-hyracks git commit:
VariableSizeFrame(VSizeFrame) support for Hyracks.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
deleted file mode 100644
index 110bddb..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
+++ /dev/null
@@ -1,717 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * @author pouria Implements Memory Manager based on creating Binary Search Tree
- * (BST) while Free slot size is the key for the BST nodes. Each node in
- * BST shows a class of free slots, while all the free slots within a
- * class have same lengths. Slots in a class are stored as a LinkedList,
- * whose head is the BST node, corresponding to that class. BST is not
- * stored as a separate data structure, but the free slots in the memory
- * are used to hold BST nodes. Each BST node has the logical structure,
- * defined in the BSTNodeUtil class.
- */
-public class BSTMemMgr implements IMemoryManager {
-
- private final IHyracksTaskContext ctx;
- public static int frameSize;
-
- private ByteBuffer[] frames;
- private ByteBuffer convertBuffer;
- private Slot root;
- private Slot result; // A reusable object to hold one node returned as
- // method result
- private Slot insertSlot; // A reusable object to hold one node within insert
- // process
- private Slot lastLeftParent; // A reusable object for the search process
- private Slot lastLeft; // A reusable object for the search process
- private Slot parent; // A reusable object for the search process
-
- private Slot[] parentRes;
- private int lastFrame;
-
- public BSTMemMgr(IHyracksTaskContext ctx, int memSize) {
- this.ctx = ctx;
- frameSize = ctx.getFrameSize();
- convertBuffer = ByteBuffer.allocate(4);
- frames = new ByteBuffer[memSize];
- lastFrame = -1;
- root = new Slot();
- insertSlot = new Slot();
- result = new Slot();
- lastLeftParent = new Slot();
- lastLeft = new Slot();
- parent = new Slot();
- parentRes = new Slot[] { new Slot(), new Slot() };
- }
-
- /**
- * result is the container sent by the caller to hold the results
- */
- @Override
- public void allocate(int length, Slot result) throws HyracksDataException {
- search(length, parentRes);
- if (parentRes[1].isNull()) {
- addFrame(parentRes);
- if (parentRes[1].isNull()) {
- return;
- }
- }
-
- int sl = BSTNodeUtil.getLength(parentRes[1], frames, convertBuffer);
- int acLen = BSTNodeUtil.getActualLength(length);
- if (shouldSplit(sl, acLen)) {
- int[] s = split(parentRes[1], parentRes[0], acLen);
- int insertLen = BSTNodeUtil.getLength(s[2], s[3], frames, convertBuffer);
- insert(s[2], s[3], insertLen); // inserting second half of the split
- // slot
- BSTNodeUtil.setHeaderFooter(s[0], s[1], length, false, frames);
- result.set(s[0], s[1]);
- return;
- }
- allocate(parentRes[1], parentRes[0], length, result);
- }
-
- @Override
- public int unallocate(Slot s) throws HyracksDataException {
- int usedLen = BSTNodeUtil.getLength(s, frames, convertBuffer);
- int actualLen = BSTNodeUtil.getActualLength(usedLen);
- int fix = s.getFrameIx();
- int off = s.getOffset();
-
- int prevMemSlotFooterOffset = ((off - BSTNodeUtil.HEADER_SIZE) >= 0 ? (off - BSTNodeUtil.HEADER_SIZE)
- : BSTNodeUtil.INVALID_INDEX);
- int t = off + 2 * BSTNodeUtil.HEADER_SIZE + actualLen;
- int nextMemSlotHeaderOffset = (t < frameSize ? t : BSTNodeUtil.INVALID_INDEX);
- // Remember: next and prev memory slots have the same frame index as the
- // unallocated slot
- if (!isNodeNull(fix, prevMemSlotFooterOffset) && BSTNodeUtil.isFree(fix, prevMemSlotFooterOffset, frames)) {
- int leftLength = BSTNodeUtil.getLength(fix, prevMemSlotFooterOffset, frames, convertBuffer);
- removeFromList(fix, prevMemSlotFooterOffset - leftLength - BSTNodeUtil.HEADER_SIZE);
- int concatLength = actualLen + leftLength + 2 * BSTNodeUtil.HEADER_SIZE;
- if (!isNodeNull(fix, nextMemSlotHeaderOffset) && BSTNodeUtil.isFree(fix, nextMemSlotHeaderOffset, frames)) {
- removeFromList(fix, nextMemSlotHeaderOffset);
- concatLength += BSTNodeUtil.getLength(fix, nextMemSlotHeaderOffset, frames, convertBuffer) + 2
- * BSTNodeUtil.HEADER_SIZE;
- }
- insert(fix, prevMemSlotFooterOffset - leftLength - BSTNodeUtil.HEADER_SIZE, concatLength); // newly
- // (merged)
- // slot
- // starts
- // at
- // the
- // prev
- // slot
- // offset
- return concatLength;
-
- } else if (!isNodeNull(fix, nextMemSlotHeaderOffset)
- && BSTNodeUtil.isFree(fix, nextMemSlotHeaderOffset, frames)) {
- removeFromList(fix, nextMemSlotHeaderOffset);
- int concatLength = actualLen + BSTNodeUtil.getLength(fix, nextMemSlotHeaderOffset, frames, convertBuffer)
- + 2 * BSTNodeUtil.HEADER_SIZE;
- insert(fix, off, concatLength); // newly (merged) slot starts at the
- // unallocated slot offset
- return concatLength;
- }
- // unallocated slot is not merging with any neighbor
- insert(fix, off, actualLen);
- return actualLen;
- }
-
- @Override
- public boolean readTuple(int frameIx, int offset, FrameTupleAppender dest) {
- int offToRead = offset + BSTNodeUtil.HEADER_SIZE;
- int length = BSTNodeUtil.getLength(frameIx, offset, frames, convertBuffer);
- return dest.append(frames[frameIx].array(), offToRead, length);
- }
-
- @Override
- public boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src, int tIndex) {
- int offToCopy = offset + BSTNodeUtil.HEADER_SIZE;
- int tStartOffset = src.getTupleStartOffset(tIndex);
- int tEndOffset = src.getTupleEndOffset(tIndex);
- int tupleLength = tEndOffset - tStartOffset;
- ByteBuffer srcBuffer = src.getBuffer();
- System.arraycopy(srcBuffer.array(), tStartOffset, frames[frameIx].array(), offToCopy, tupleLength);
- return true;
- }
-
- @Override
- public ByteBuffer getFrame(int frameIndex) {
- return frames[frameIndex];
- }
-
- @Override
- public void close() {
- //clean up all frames
- for (int i = 0; i < frames.length; i++)
- frames[i] = null;
- }
-
- /**
- * @param parentResult
- * is the container passed by the caller to contain the results
- * @throws HyracksDataException
- */
- private void addFrame(Slot[] parentResult) throws HyracksDataException {
- clear(parentResult);
- if ((lastFrame + 1) >= frames.length) {
- return;
- }
- frames[++lastFrame] = allocateFrame();
- int l = frameSize - 2 * BSTNodeUtil.HEADER_SIZE;
- BSTNodeUtil.setHeaderFooter(lastFrame, 0, l, true, frames);
- initNewNode(lastFrame, 0);
-
- parentResult[1].copy(root);
- if (parentResult[1].isNull()) { // root is null
- root.set(lastFrame, 0);
- initNewNode(root.getFrameIx(), root.getOffset());
- parentResult[1].copy(root);
- return;
- }
-
- while (!parentResult[1].isNull()) {
- if (BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer) == l) {
- append(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0);
- parentResult[1].set(lastFrame, 0);
- return;
- }
- if (l < BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer)) {
- if (isNodeNull(BSTNodeUtil.getLeftChildFrameIx(parentResult[1], frames, convertBuffer),
- BSTNodeUtil.getLeftChildOffset(parentResult[1], frames, convertBuffer))) {
- BSTNodeUtil.setLeftChild(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0,
- frames);
- parentResult[0].copy(parentResult[1]);
- parentResult[1].set(lastFrame, 0);
- return;
- } else {
- parentResult[0].copy(parentResult[1]);
- parentResult[1].set(BSTNodeUtil.getLeftChildFrameIx(parentResult[1], frames, convertBuffer),
- BSTNodeUtil.getLeftChildOffset(parentResult[1], frames, convertBuffer));
- }
- } else {
- if (isNodeNull(BSTNodeUtil.getRightChildFrameIx(parentResult[1], frames, convertBuffer),
- BSTNodeUtil.getRightChildOffset(parentResult[1], frames, convertBuffer))) {
- BSTNodeUtil.setRightChild(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0,
- frames);
- parentResult[0].copy(parentResult[1]);
- parentResult[1].set(lastFrame, 0);
- return;
- } else {
- parentResult[0].copy(parentResult[1]);
- parentResult[1].set(BSTNodeUtil.getRightChildFrameIx(parentResult[1], frames, convertBuffer),
- BSTNodeUtil.getRightChildOffset(parentResult[1], frames, convertBuffer));
- }
- }
- }
- throw new HyracksDataException("New Frame could not be added to BSTMemMgr");
- }
-
- private void insert(int fix, int off, int length) throws HyracksDataException {
- BSTNodeUtil.setHeaderFooter(fix, off, length, true, frames);
- initNewNode(fix, off);
-
- if (root.isNull()) {
- root.set(fix, off);
- return;
- }
-
- insertSlot.clear();
- insertSlot.copy(root);
- while (!insertSlot.isNull()) {
- int curSlotLen = BSTNodeUtil.getLength(insertSlot, frames, convertBuffer);
- if (curSlotLen == length) {
- append(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off);
- return;
- }
- if (length < curSlotLen) {
- int leftChildFIx = BSTNodeUtil.getLeftChildFrameIx(insertSlot, frames, convertBuffer);
- int leftChildOffset = BSTNodeUtil.getLeftChildOffset(insertSlot, frames, convertBuffer);
- if (isNodeNull(leftChildFIx, leftChildOffset)) {
- initNewNode(fix, off);
- BSTNodeUtil.setLeftChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
- return;
- } else {
- insertSlot.set(leftChildFIx, leftChildOffset);
- }
- } else {
- int rightChildFIx = BSTNodeUtil.getRightChildFrameIx(insertSlot, frames, convertBuffer);
- int rightChildOffset = BSTNodeUtil.getRightChildOffset(insertSlot, frames, convertBuffer);
- if (isNodeNull(rightChildFIx, rightChildOffset)) {
- initNewNode(fix, off);
- BSTNodeUtil.setRightChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
- return;
- } else {
- insertSlot.set(rightChildFIx, rightChildOffset);
- }
- }
- }
- throw new HyracksDataException("Failure in node insertion into BST in BSTMemMgr");
- }
-
- /**
- * @param length
- * @param target
- * is the container sent by the caller to hold the results
- */
- private void search(int length, Slot[] target) {
- clear(target);
- result.clear();
-
- if (root.isNull()) {
- return;
- }
-
- lastLeftParent.clear();
- lastLeft.clear();
- parent.clear();
- result.copy(root);
-
- while (!result.isNull()) {
- if (BSTNodeUtil.getLength(result, frames, convertBuffer) == length) {
- target[0].copy(parent);
- target[1].copy(result);
- return;
- }
- if (length < BSTNodeUtil.getLength(result, frames, convertBuffer)) {
- lastLeftParent.copy(parent);
- lastLeft.copy(result);
- parent.copy(result);
- int fix = BSTNodeUtil.getLeftChildFrameIx(result, frames, convertBuffer);
- int off = BSTNodeUtil.getLeftChildOffset(result, frames, convertBuffer);
- result.set(fix, off);
- } else {
- parent.copy(result);
- int fix = BSTNodeUtil.getRightChildFrameIx(result, frames, convertBuffer);
- int off = BSTNodeUtil.getRightChildOffset(result, frames, convertBuffer);
- result.set(fix, off);
- }
- }
-
- target[0].copy(lastLeftParent);
- target[1].copy(lastLeft);
-
- }
-
- private void append(int headFix, int headOff, int nodeFix, int nodeOff) {
- initNewNode(nodeFix, nodeOff);
-
- int fix = BSTNodeUtil.getNextFrameIx(headFix, headOff, frames, convertBuffer); // frameIx
- // for
- // the
- // current
- // next
- // of
- // head
- int off = BSTNodeUtil.getNextOffset(headFix, headOff, frames, convertBuffer); // offset
- // for
- // the
- // current
- // next
- // of
- // head
- BSTNodeUtil.setNext(nodeFix, nodeOff, fix, off, frames);
-
- if (!isNodeNull(fix, off)) {
- BSTNodeUtil.setPrev(fix, off, nodeFix, nodeOff, frames);
- }
- BSTNodeUtil.setPrev(nodeFix, nodeOff, headFix, headOff, frames);
- BSTNodeUtil.setNext(headFix, headOff, nodeFix, nodeOff, frames);
- }
-
- private int[] split(Slot listHead, Slot parent, int length) {
- int l2 = BSTNodeUtil.getLength(listHead, frames, convertBuffer) - length - 2 * BSTNodeUtil.HEADER_SIZE;
- // We split the node after slots-list head
- if (!isNodeNull(BSTNodeUtil.getNextFrameIx(listHead, frames, convertBuffer),
- BSTNodeUtil.getNextOffset(listHead, frames, convertBuffer))) {
- int afterHeadFix = BSTNodeUtil.getNextFrameIx(listHead, frames, convertBuffer);
- int afterHeadOff = BSTNodeUtil.getNextOffset(listHead, frames, convertBuffer);
- int afHNextFix = BSTNodeUtil.getNextFrameIx(afterHeadFix, afterHeadOff, frames, convertBuffer);
- int afHNextOff = BSTNodeUtil.getNextOffset(afterHeadFix, afterHeadOff, frames, convertBuffer);
- BSTNodeUtil.setNext(listHead.getFrameIx(), listHead.getOffset(), afHNextFix, afHNextOff, frames);
- if (!isNodeNull(afHNextFix, afHNextOff)) {
- BSTNodeUtil.setPrev(afHNextFix, afHNextOff, listHead.getFrameIx(), listHead.getOffset(), frames);
- }
- int secondOffset = afterHeadOff + length + 2 * BSTNodeUtil.HEADER_SIZE;
- BSTNodeUtil.setHeaderFooter(afterHeadFix, afterHeadOff, length, true, frames);
- BSTNodeUtil.setHeaderFooter(afterHeadFix, secondOffset, l2, true, frames);
-
- return new int[] { afterHeadFix, afterHeadOff, afterHeadFix, secondOffset };
- }
- // We split the head
- int secondOffset = listHead.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE;
- BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), listHead.getOffset(), length, true, frames);
- BSTNodeUtil.setHeaderFooter(listHead.getFrameIx(), secondOffset, l2, true, frames);
-
- fixTreePtrs(listHead.getFrameIx(), listHead.getOffset(), parent.getFrameIx(), parent.getOffset());
- return new int[] { listHead.getFrameIx(), listHead.getOffset(), listHead.getFrameIx(), secondOffset };
- }
-
- private void fixTreePtrs(int nodeFrameIx, int nodeOffset, int parentFrameIx, int parentOffset) {
- int nodeLeftChildFrameIx = BSTNodeUtil.getLeftChildFrameIx(nodeFrameIx, nodeOffset, frames, convertBuffer);
- int nodeLeftChildOffset = BSTNodeUtil.getLeftChildOffset(nodeFrameIx, nodeOffset, frames, convertBuffer);
- int nodeRightChildFrameIx = BSTNodeUtil.getRightChildFrameIx(nodeFrameIx, nodeOffset, frames, convertBuffer);
- int nodeRightChildOffset = BSTNodeUtil.getRightChildOffset(nodeFrameIx, nodeOffset, frames, convertBuffer);
-
- int status = -1; // (status==0 if node is left child of parent)
- // (status==1 if node is right child of parent)
- if (!isNodeNull(parentFrameIx, parentOffset)) {
- int nlen = BSTNodeUtil.getActualLength(BSTNodeUtil
- .getLength(nodeFrameIx, nodeOffset, frames, convertBuffer));
- int plen = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(parentFrameIx, parentOffset, frames,
- convertBuffer));
- status = ((nlen < plen) ? 0 : 1);
- }
-
- if (!isNodeNull(nodeLeftChildFrameIx, nodeLeftChildOffset)
- && !isNodeNull(nodeRightChildFrameIx, nodeRightChildOffset)) { // Node
- // has
- // two
- // children
- int pMinFIx = nodeFrameIx;
- int pMinOff = nodeOffset;
- int minFIx = nodeRightChildFrameIx;
- int minOff = nodeRightChildOffset;
- int nextLeftFIx = BSTNodeUtil.getLeftChildFrameIx(minFIx, minOff, frames, convertBuffer);
- int nextLeftOff = BSTNodeUtil.getLeftChildOffset(minFIx, minOff, frames, convertBuffer);
-
- while (!isNodeNull(nextLeftFIx, nextLeftOff)) {
- pMinFIx = minFIx;
- pMinOff = minOff;
- minFIx = nextLeftFIx;
- minOff = nextLeftOff;
- nextLeftFIx = BSTNodeUtil.getLeftChildFrameIx(minFIx, minOff, frames, convertBuffer); // min
- // is
- // now
- // pointing
- // to
- // current
- // (old)
- // next
- // left
- nextLeftOff = BSTNodeUtil.getLeftChildOffset(minFIx, minOff, frames, convertBuffer); // min
- // is
- // now
- // pointing
- // to
- // current
- // (old)
- // next
- // left
- }
-
- if ((nodeRightChildFrameIx == minFIx) && (nodeRightChildOffset == minOff)) { // nrc
- // is
- // the
- // same as min
- BSTNodeUtil.setLeftChild(nodeRightChildFrameIx, nodeRightChildOffset, nodeLeftChildFrameIx,
- nodeLeftChildOffset, frames);
- } else { // min is different from nrc
- int minRightFIx = BSTNodeUtil.getRightChildFrameIx(minFIx, minOff, frames, convertBuffer);
- int minRightOffset = BSTNodeUtil.getRightChildOffset(minFIx, minOff, frames, convertBuffer);
- BSTNodeUtil.setRightChild(minFIx, minOff, nodeRightChildFrameIx, nodeRightChildOffset, frames);
- BSTNodeUtil.setLeftChild(minFIx, minOff, nodeLeftChildFrameIx, nodeLeftChildOffset, frames);
- BSTNodeUtil.setLeftChild(pMinFIx, pMinOff, minRightFIx, minRightOffset, frames);
- }
-
- // Now dealing with the parent
- if (!isNodeNull(parentFrameIx, parentOffset)) {
- if (status == 0) {
- BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, minFIx, minOff, frames);
- } else if (status == 1) {
- BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, minFIx, minOff, frames);
- }
- } else { // No parent (node was the root)
- root.set(minFIx, minOff);
- }
- return;
- }
-
- else if (!isNodeNull(nodeLeftChildFrameIx, nodeLeftChildOffset)) { // Node
- // has
- // only
- // left
- // child
- if (status == 0) {
- BSTNodeUtil
- .setLeftChild(parentFrameIx, parentOffset, nodeLeftChildFrameIx, nodeLeftChildOffset, frames);
- } else if (status == 1) {
- BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, nodeLeftChildFrameIx, nodeLeftChildOffset,
- frames);
- } else if (status == -1) { // No parent, so node is root
- root.set(nodeLeftChildFrameIx, nodeLeftChildOffset);
- }
- return;
- }
-
- else if (!isNodeNull(nodeRightChildFrameIx, nodeRightChildOffset)) { // Node
- // has
- // only
- // right
- // child
- if (status == 0) {
- BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, nodeRightChildFrameIx, nodeRightChildOffset,
- frames);
- } else if (status == 1) {
- BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, nodeRightChildFrameIx, nodeRightChildOffset,
- frames);
- } else if (status == -1) { // No parent, so node is root
- root.set(nodeRightChildFrameIx, nodeRightChildOffset);
- }
- return;
- }
-
- else { // Node is leaf (no children)
- if (status == 0) {
- BSTNodeUtil.setLeftChild(parentFrameIx, parentOffset, BSTNodeUtil.INVALID_INDEX,
- BSTNodeUtil.INVALID_INDEX, frames);
- } else if (status == 1) {
- BSTNodeUtil.setRightChild(parentFrameIx, parentOffset, BSTNodeUtil.INVALID_INDEX,
- BSTNodeUtil.INVALID_INDEX, frames);
- } else { // node was the only node in the tree
- root.clear();
- }
- return;
- }
- }
-
- /**
- * Allocation with no splitting but padding
- *
- * @param node
- * @param parent
- * @param result
- * is the container sent by the caller to hold the results
- */
- private void allocate(Slot node, Slot parent, int length, Slot result) {
- int nextFix = BSTNodeUtil.getNextFrameIx(node, frames, convertBuffer);
- int nextOff = BSTNodeUtil.getNextOffset(node, frames, convertBuffer);
- if (!isNodeNull(nextFix, nextOff)) {
- int nextOfNextFIx = BSTNodeUtil.getNextFrameIx(nextFix, nextOff, frames, convertBuffer);
- int nextOfNextOffset = BSTNodeUtil.getNextOffset(nextFix, nextOff, frames, convertBuffer);
- BSTNodeUtil.setNext(node.getFrameIx(), node.getOffset(), nextOfNextFIx, nextOfNextOffset, frames);
- if (!isNodeNull(nextOfNextFIx, nextOfNextOffset)) {
- BSTNodeUtil.setPrev(nextOfNextFIx, nextOfNextOffset, node.getFrameIx(), node.getOffset(), frames);
- }
- BSTNodeUtil.setHeaderFooter(nextFix, nextOff, length, false, frames);
- result.set(nextFix, nextOff);
- return;
- }
-
- fixTreePtrs(node.getFrameIx(), node.getOffset(), parent.getFrameIx(), parent.getOffset());
- BSTNodeUtil.setHeaderFooter(node.getFrameIx(), node.getOffset(), length, false, frames);
- result.copy(node);
- }
-
- private void removeFromList(int fix, int off) {
- int nextFIx = BSTNodeUtil.getNextFrameIx(fix, off, frames, convertBuffer);
- int nextOffset = BSTNodeUtil.getNextOffset(fix, off, frames, convertBuffer);
- int prevFIx = BSTNodeUtil.getPrevFrameIx(fix, off, frames, convertBuffer);
- int prevOffset = BSTNodeUtil.getPrevOffset(fix, off, frames, convertBuffer);
- if (!isNodeNull(prevFIx, prevOffset) && !isNodeNull(nextFIx, nextOffset)) {
- BSTNodeUtil.setNext(prevFIx, prevOffset, nextFIx, nextOffset, frames);
- BSTNodeUtil.setPrev(nextFIx, nextOffset, prevFIx, prevOffset, frames);
- BSTNodeUtil.setNext(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
- BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
- return;
- }
- if (!isNodeNull(prevFIx, prevOffset)) {
- BSTNodeUtil.setNext(prevFIx, prevOffset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
- BSTNodeUtil.setPrev(fix, off, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
- return;
- }
-
- // We need to find the parent, so we can fix the tree
- int parentFIx = BSTNodeUtil.INVALID_INDEX;
- int parentOffset = BSTNodeUtil.INVALID_INDEX;
- int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(fix, off, frames, convertBuffer));
- fix = root.getFrameIx();
- off = root.getOffset();
- int curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
- while (length != curLen) {
- parentFIx = fix;
- parentOffset = off;
- if (length < curLen) {
- fix = BSTNodeUtil.getLeftChildFrameIx(parentFIx, parentOffset, frames, convertBuffer); // parentFIx
- // is
- // now
- // the
- // old(current)
- // fix
- off = BSTNodeUtil.getLeftChildOffset(parentFIx, parentOffset, frames, convertBuffer); // parentOffset
- // is
- // now
- // the
- // old(current)
- // off
- } else {
- fix = BSTNodeUtil.getRightChildFrameIx(parentFIx, parentOffset, frames, convertBuffer); // parentFIx
- // is
- // now
- // the
- // old(current)
- // fix
- off = BSTNodeUtil.getRightChildOffset(parentFIx, parentOffset, frames, convertBuffer); // parentOffset
- // is
- // now
- // the
- // old(current)
- // off
- }
- curLen = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
- }
-
- if (!isNodeNull(nextFIx, nextOffset)) { // it is head of the list (in
- // the
- // tree)
- BSTNodeUtil.setPrev(nextFIx, nextOffset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
- int nodeLeftChildFIx = BSTNodeUtil.getLeftChildFrameIx(fix, off, frames, convertBuffer);
- int nodeLeftChildOffset = BSTNodeUtil.getLeftChildOffset(fix, off, frames, convertBuffer);
- int nodeRightChildFix = BSTNodeUtil.getRightChildFrameIx(fix, off, frames, convertBuffer);
- int nodeRightChildOffset = BSTNodeUtil.getRightChildOffset(fix, off, frames, convertBuffer);
- BSTNodeUtil.setLeftChild(nextFIx, nextOffset, nodeLeftChildFIx, nodeLeftChildOffset, frames);
- BSTNodeUtil.setRightChild(nextFIx, nextOffset, nodeRightChildFix, nodeRightChildOffset, frames);
- if (!isNodeNull(parentFIx, parentOffset)) {
- int parentLength = BSTNodeUtil.getLength(parentFIx, parentOffset, frames, convertBuffer);
- if (length < parentLength) {
- BSTNodeUtil.setLeftChild(parentFIx, parentOffset, nextFIx, nextOffset, frames);
- } else {
- BSTNodeUtil.setRightChild(parentFIx, parentOffset, nextFIx, nextOffset, frames);
- }
- }
-
- if ((root.getFrameIx() == fix) && (root.getOffset() == off)) {
- root.set(nextFIx, nextOffset);
- }
-
- return;
- }
-
- fixTreePtrs(fix, off, parentFIx, parentOffset);
- }
-
- private void clear(Slot[] s) {
- s[0].clear();
- s[1].clear();
- }
-
- private boolean isNodeNull(int frameIx, int offset) {
- return ((frameIx == BSTNodeUtil.INVALID_INDEX) || (offset == BSTNodeUtil.INVALID_INDEX) || (frames[frameIx] == null));
- }
-
- private boolean shouldSplit(int slotLength, int reqLength) {
- return ((slotLength - reqLength) >= BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE);
- }
-
- private void initNewNode(int frameIx, int offset) {
- BSTNodeUtil.setLeftChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
- BSTNodeUtil.setRightChild(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
- BSTNodeUtil.setNext(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
- BSTNodeUtil.setPrev(frameIx, offset, BSTNodeUtil.INVALID_INDEX, BSTNodeUtil.INVALID_INDEX, frames);
- }
-
- private ByteBuffer allocateFrame() throws HyracksDataException {
- return ctx.allocateFrame();
- }
-
- public String debugPrintMemory() {
- Slot s = new Slot(0, 0);
- if (s.isNull()) {
- return "memory:\tNull";
- }
-
- String m = "memory:\n" + debugPrintSlot(0, 0) + "\n";
- int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(0, 0, frames, convertBuffer));
- int noff = (length + 2 * BSTNodeUtil.HEADER_SIZE >= frameSize ? BSTNodeUtil.INVALID_INDEX : length + 2
- * BSTNodeUtil.HEADER_SIZE);
- int nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length == 1) ? BSTNodeUtil.INVALID_INDEX : 1) : 0);
- if (noff == BSTNodeUtil.INVALID_INDEX && nfix != BSTNodeUtil.INVALID_INDEX) {
- noff = 0;
- }
- s.set(nfix, noff);
- while (!isNodeNull(s.getFrameIx(), s.getOffset())) {
- m += debugPrintSlot(s.getFrameIx(), s.getOffset()) + "\n";
- length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(s.getFrameIx(), s.getOffset(), frames,
- convertBuffer));
- noff = (s.getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE >= frameSize ? BSTNodeUtil.INVALID_INDEX : s
- .getOffset() + length + 2 * BSTNodeUtil.HEADER_SIZE);
- nfix = (noff == BSTNodeUtil.INVALID_INDEX ? ((frames.length - 1 == s.getFrameIx()) ? BSTNodeUtil.INVALID_INDEX
- : s.getFrameIx() + 1)
- : s.getFrameIx());
- if (noff == BSTNodeUtil.INVALID_INDEX && nfix != BSTNodeUtil.INVALID_INDEX) {
- noff = 0;
- }
- s.set(nfix, noff);
- }
- return m;
- }
-
- public String debugPrintTree() {
- Slot node = new Slot();
- node.copy(root);
- if (!node.isNull()) {
- return debugPrintSubTree(node);
- }
- return "Null";
- }
-
- private String debugPrintSubTree(Slot r) {
- Slot node = new Slot();
- node.copy(r);
- int fix = node.getFrameIx();
- int off = node.getOffset();
- int lfix = BSTNodeUtil.getLeftChildFrameIx(node, frames, convertBuffer);
- int loff = BSTNodeUtil.getLeftChildOffset(node, frames, convertBuffer);
- int rfix = BSTNodeUtil.getRightChildFrameIx(node, frames, convertBuffer);
- int roff = BSTNodeUtil.getRightChildOffset(node, frames, convertBuffer);
- int nfix = BSTNodeUtil.getNextFrameIx(node, frames, convertBuffer);
- int noff = BSTNodeUtil.getNextOffset(node, frames, convertBuffer);
- int pfix = BSTNodeUtil.getPrevFrameIx(node, frames, convertBuffer);
- int poff = BSTNodeUtil.getPrevOffset(node, frames, convertBuffer);
-
- String s = "{" + r.getFrameIx() + ", " + r.getOffset() + " (Len: "
- + BSTNodeUtil.getLength(fix, off, frames, convertBuffer) + ") - " + "(LC: "
- + debugPrintSlot(lfix, loff) + ") - " + "(RC: " + debugPrintSlot(rfix, roff) + ") - " + "(NX: "
- + debugPrintSlot(nfix, noff) + ") - " + "(PR: " + debugPrintSlot(pfix, poff) + ") }\n";
- if (!isNodeNull(lfix, loff)) {
- s += debugPrintSubTree(new Slot(lfix, loff)) + "\n";
- }
- if (!isNodeNull(rfix, roff)) {
- s += debugPrintSubTree(new Slot(rfix, roff)) + "\n";
- }
-
- return s;
- }
-
- private String debugPrintSlot(int fix, int off) {
- if (isNodeNull(fix, off)) {
- return BSTNodeUtil.INVALID_INDEX + ", " + BSTNodeUtil.INVALID_INDEX;
- }
- int l = BSTNodeUtil.getLength(fix, off, frames, convertBuffer);
- int al = BSTNodeUtil.getActualLength(l);
- boolean f = BSTNodeUtil.isFree(fix, off, frames);
- return fix + ", " + off + " (free: " + f + ") (Len: " + l + ") (actual len: " + al + ") ";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
deleted file mode 100644
index 8a1bcd3..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-
-/**
- * @author pouria
- * Implements utility methods, used extensively and repeatedly within
- * the BSTMemMgr.
- * Mainly includes methods to set/get different types of pointers,
- * required and accessed within BST traversal, along with the methods
- * for setting/getting length/header/footer of free slots, which have
- * been used as the containers for BST nodes.
- */
-public class BSTNodeUtil {
-
- static final int MINIMUM_FREE_SLOT_SIZE = 32;
-
- private static final int FRAME_PTR_SIZE = 4;
- private static final int OFFSET_SIZE = 2;
-
- static final int HEADER_SIZE = 2;
- private static final int HEADER_INDEX = 0;
-
- private static final int LEFT_CHILD_FRAME_INDEX = HEADER_INDEX + HEADER_SIZE;
- private static final int LEFT_CHILD_OFFSET_INDEX = LEFT_CHILD_FRAME_INDEX + FRAME_PTR_SIZE;
-
- private static final int RIGHT_CHILD_FRAME_INDEX = LEFT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
- private static final int RIGHT_CHILD_OFFSET_INDEX = RIGHT_CHILD_FRAME_INDEX + FRAME_PTR_SIZE;
-
- private static final int NEXT_FRAME_INDEX = RIGHT_CHILD_OFFSET_INDEX + OFFSET_SIZE;
- private static final int NEXT_OFFSET_INDEX = NEXT_FRAME_INDEX + FRAME_PTR_SIZE;
-
- private static final int PREV_FRAME_INDEX = NEXT_OFFSET_INDEX + OFFSET_SIZE;
- private static final int PREV_OFFSET_INDEX = PREV_FRAME_INDEX + FRAME_PTR_SIZE;
-
- private static final byte INVALID = -128;
- private static final byte MASK = 127;
- static final int INVALID_INDEX = -1;
-
- /*
- * Structure of a free slot:
- * [HEADER][LEFT_CHILD][RIGHT_CHILD][NEXT][PREV]...[FOOTER] MSB in the
- * HEADER is set to 1 in a free slot
- *
- * Structure of a used slot: [HEADER]...[FOOTER] MSB in the HEADER is set to
- * 0 in a used slot
- */
-
- static int getLeftChildFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return getLeftChildFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
-
- static int getLeftChildOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return getLeftChildOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
-
- static int getLeftChildFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
-
- }
-
- static int getLeftChildOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
- }
-
- static void setLeftChild(Slot node, Slot lc, ByteBuffer[] frames) {
- setLeftChild(node.getFrameIx(), node.getOffset(), lc.getFrameIx(), lc.getOffset(), frames);
- }
-
- static void setLeftChild(int nodeFix, int nodeOff, int lcFix, int lcOff, ByteBuffer[] frames) {
- storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, lcFix);
- storeInt(frames[nodeFix], nodeOff + LEFT_CHILD_OFFSET_INDEX, OFFSET_SIZE, lcOff);
- }
-
- static int getRightChildFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return getRightChildFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
-
- static int getRightChildOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return getRightChildOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
-
- static int getRightChildFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
- }
-
- static int getRightChildOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
- }
-
- static void setRightChild(Slot node, Slot rc, ByteBuffer[] frames) {
- setRightChild(node.getFrameIx(), node.getOffset(), rc.getFrameIx(), rc.getOffset(), frames);
- }
-
- static void setRightChild(int nodeFix, int nodeOff, int rcFix, int rcOff, ByteBuffer[] frames) {
- storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_FRAME_INDEX, FRAME_PTR_SIZE, rcFix);
- storeInt(frames[nodeFix], nodeOff + RIGHT_CHILD_OFFSET_INDEX, OFFSET_SIZE, rcOff);
- }
-
- static int getNextFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return getNextFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
-
- static int getNextOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return getNextOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
-
- static int getNextFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + NEXT_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
- }
-
- static int getNextOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + NEXT_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
- }
-
- static void setNext(Slot node, Slot next, ByteBuffer[] frames) {
- setNext(node.getFrameIx(), node.getOffset(), next.getFrameIx(), node.getOffset(), frames);
- }
-
- static void setNext(int nodeFix, int nodeOff, int nFix, int nOff, ByteBuffer[] frames) {
- storeInt(frames[nodeFix], nodeOff + NEXT_FRAME_INDEX, FRAME_PTR_SIZE, nFix);
- storeInt(frames[nodeFix], nodeOff + NEXT_OFFSET_INDEX, OFFSET_SIZE, nOff);
- }
-
- static int getPrevFrameIx(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return getPrevFrameIx(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
-
- static int getPrevOffset(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return getPrevOffset(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
-
- static int getPrevFrameIx(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + PREV_FRAME_INDEX, FRAME_PTR_SIZE, convertBuffer));
- }
-
- static int getPrevOffset(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return (retrieveAsInt(frames[frameIx], offset + PREV_OFFSET_INDEX, OFFSET_SIZE, convertBuffer));
- }
-
- static void setPrev(Slot node, Slot prev, ByteBuffer[] frames) {
- setPrev(node.getFrameIx(), node.getOffset(), prev.getFrameIx(), prev.getOffset(), frames);
- }
-
- static void setPrev(int nodeFix, int nodeOff, int pFix, int pOff, ByteBuffer[] frames) {
- storeInt(frames[nodeFix], nodeOff + PREV_FRAME_INDEX, FRAME_PTR_SIZE, pFix);
- storeInt(frames[nodeFix], nodeOff + PREV_OFFSET_INDEX, OFFSET_SIZE, pOff);
- }
-
- static boolean slotsTheSame(Slot s, Slot t) {
- return ((s.getFrameIx() == t.getFrameIx()) && (s.getOffset() == t.getOffset()));
- }
-
- static void setHeaderFooter(int frameIx, int offset, int usedLength, boolean isFree, ByteBuffer[] frames) {
- int slotLength = getActualLength(usedLength);
- int footerOffset = offset + HEADER_SIZE + slotLength;
- storeInt(frames[frameIx], offset, HEADER_SIZE, usedLength);
- storeInt(frames[frameIx], footerOffset, HEADER_SIZE, usedLength);
- setFree(frameIx, offset, isFree, frames);
- setFree(frameIx, footerOffset, isFree, frames);
- }
-
- static int getLength(Slot s, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- return getLength(s.getFrameIx(), s.getOffset(), frames, convertBuffer);
- }
-
- static int getLength(int frameIx, int offset, ByteBuffer[] frames, ByteBuffer convertBuffer) {
- convertBuffer.clear();
- for (int i = 0; i < 4 - HEADER_SIZE; i++) { // padding
- convertBuffer.put(i, (byte) 0x00);
- }
-
- convertBuffer.put(4 - HEADER_SIZE, (byte) ((frames[frameIx].get(offset)) & (MASK)));
- System.arraycopy(frames[frameIx].array(), offset + 1, convertBuffer.array(), 5 - HEADER_SIZE, HEADER_SIZE - 1);
- return convertBuffer.getInt(0);
- }
-
- // MSB equal to 1 means FREE
- static boolean isFree(int frameIx, int offset, ByteBuffer[] frames) {
- return ((((frames[frameIx]).array()[offset]) & 0x80) == 0x80);
- }
-
- static void setFree(int frameIx, int offset, boolean free, ByteBuffer[] frames) {
- if (free) { // set MSB to 1 (for free)
- frames[frameIx].put(offset, (byte) (((frames[frameIx]).array()[offset]) | 0x80));
- } else { // set MSB to 0 (for used)
- frames[frameIx].put(offset, (byte) (((frames[frameIx]).array()[offset]) & 0x7F));
- }
- }
-
- static int getActualLength(int l) {
- int r = (l + 2 * HEADER_SIZE) % MINIMUM_FREE_SLOT_SIZE;
- return (r == 0 ? l : (l + (BSTNodeUtil.MINIMUM_FREE_SLOT_SIZE - r)));
- }
-
- private static int retrieveAsInt(ByteBuffer b, int fromIndex, int size, ByteBuffer convertBuffer) {
- if ((b.get(fromIndex) & INVALID) == INVALID) {
- return INVALID_INDEX;
- }
-
- convertBuffer.clear();
- for (int i = 0; i < 4 - size; i++) { // padding
- convertBuffer.put(i, (byte) 0x00);
- }
-
- System.arraycopy(b.array(), fromIndex, convertBuffer.array(), 4 - size, size);
- return convertBuffer.getInt(0);
- }
-
- private static void storeInt(ByteBuffer b, int fromIndex, int size, int value) {
- if (value == INVALID_INDEX) {
- b.put(fromIndex, INVALID);
- return;
- }
- for (int i = 0; i < size; i++) {
- b.put(fromIndex + i, (byte) ((value >>> (8 * ((size - 1 - i)))) & 0xff));
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index e1315e7..3dc8b41 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -1,64 +1,42 @@
/*
* Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.sort;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.*;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
-public class ExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+import java.util.List;
- private static final int SORT_ACTIVITY_ID = 0;
- private static final int MERGE_ACTIVITY_ID = 1;
+public class ExternalSortOperatorDescriptor extends AbstractSorterOperatorDescriptor {
- private final int[] sortFields;
- private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final int framesLimit;
+ private static final long serialVersionUID = 1L;
private Algorithm alg = Algorithm.MERGE_SORT;
+ private EnumFreeSlotPolicy policy = EnumFreeSlotPolicy.LAST_FIT;
+ private final int outputLimit;
public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor, Algorithm alg) {
- this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
- this.alg = alg;
+ this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, alg,
+ EnumFreeSlotPolicy.LAST_FIT);
}
public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
@@ -69,127 +47,52 @@ public class ExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) {
- super(spec, 1, 1);
- this.framesLimit = framesLimit;
- this.sortFields = sortFields;
- this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
- this.comparatorFactories = comparatorFactories;
- if (framesLimit <= 1) {
- throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
- }
- recordDescriptors[0] = recordDescriptor;
+ this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+ Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT);
}
@Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
- MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
-
- builder.addActivity(this, sa);
- builder.addSourceEdge(0, sa, 0);
-
- builder.addActivity(this, ma);
- builder.addTargetEdge(0, ma, 0);
-
- builder.addBlockingEdge(sa, ma);
+ public AbstractSorterOperatorDescriptor.SortActivity getSortActivity(ActivityId id) {
+ return new AbstractSorterOperatorDescriptor.SortActivity(id) {
+ @Override
+ protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider) throws HyracksDataException {
+ return new ExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
+ comparatorFactories, recordDescriptors[0], alg, policy, framesLimit, outputLimit);
+ }
+ };
}
- public static class SortTaskState extends AbstractStateObject {
- private List<IFrameReader> runs;
- private IFrameSorter frameSorter;
-
- public SortTaskState() {
- }
-
- private SortTaskState(JobId jobId, TaskId taskId) {
- super(jobId, taskId);
- }
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
-
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
-
- }
+ @Override
+ public AbstractSorterOperatorDescriptor.MergeActivity getMergeActivity(ActivityId id) {
+ return new AbstractSorterOperatorDescriptor.MergeActivity(id) {
+ @Override
+ protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter, List<RunAndMaxFrameSizePair> runs, IBinaryComparator[] comparators,
+ INormalizedKeyComputer nmkComputer, int necessaryFrames) {
+ return new ExternalSortRunMerger(ctx, sorter, runs, sortFields, comparators,
+ nmkComputer, recordDescriptors[0], necessaryFrames, outputLimit, writer);
+ }
+ };
}
- private class SortActivity extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- public SortActivity(ActivityId id) {
- super(id);
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
- IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private ExternalSortRunGenerator runGen;
-
- @Override
- public void open() throws HyracksDataException {
- runGen = new ExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, recordDescriptors[0], alg, framesLimit);
- runGen.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- runGen.nextFrame(buffer);
- }
-
- @Override
- public void close() throws HyracksDataException {
- SortTaskState state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(
- getActivityId(), partition));
- runGen.close();
- state.runs = runGen.getRuns();
- state.frameSorter = runGen.getFrameSorter();
- ctx.setStateObject(state);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- runGen.fail();
- }
- };
- return op;
- }
+ public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy) {
+ this(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor, alg,
+ policy, Integer.MAX_VALUE);
}
- private class MergeActivity extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- public MergeActivity(ActivityId id) {
- super(id);
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
- IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
- @Override
- public void initialize() throws HyracksDataException {
- SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
- SORT_ACTIVITY_ID), partition));
- List<IFrameReader> runs = state.runs;
- IFrameSorter frameSorter = state.frameSorter;
- IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
- : firstKeyNormalizerFactory.createNormalizedKeyComputer();
- int necessaryFrames = Math.min(runs.size() + 2, framesLimit);
- ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
- comparators, nmkComputer, recordDescriptors[0], necessaryFrames, writer);
- merger.process();
- }
- };
- return op;
+ public ExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, Algorithm alg, EnumFreeSlotPolicy policy, int outputLimit) {
+ super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+ if (framesLimit <= 1) {
+ throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
}
+ this.alg = alg;
+ this.policy = policy;
+ this.outputLimit = outputLimit;
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index 3736fca..4048726 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -1,24 +1,21 @@
/*
* Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -27,77 +24,89 @@ import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotSmallestFit;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotBiggestFirst;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameFreeSlotPolicy;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
-public class ExternalSortRunGenerator implements IFrameWriter {
- private final IHyracksTaskContext ctx;
- private final IFrameSorter frameSorter;
- private final List<IFrameReader> runs;
- private final int maxSortFrames;
+public class ExternalSortRunGenerator extends AbstractSortRunGenerator {
+
+ protected final IHyracksTaskContext ctx;
+ protected final IFrameSorter frameSorter;
+ protected final int maxSortFrames;
public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, Algorithm alg, int framesLimit) throws HyracksDataException {
+ this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg,
+ EnumFreeSlotPolicy.LAST_FIT, framesLimit);
+ }
+
+ public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit)
+ throws HyracksDataException {
+ this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit,
+ Integer.MAX_VALUE);
+ }
+
+ public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit, int outputLimit)
+ throws HyracksDataException {
this.ctx = ctx;
+ maxSortFrames = framesLimit - 1;
+
+ IFrameFreeSlotPolicy freeSlotPolicy = null;
+ switch (policy) {
+ case SMALLEST_FIT:
+ freeSlotPolicy = new FrameFreeSlotSmallestFit();
+ break;
+ case LAST_FIT:
+ freeSlotPolicy = new FrameFreeSlotLastFit(maxSortFrames);
+ break;
+ case BIGGEST_FIT:
+ freeSlotPolicy = new FrameFreeSlotBiggestFirst(maxSortFrames);
+ break;
+ }
+ IFrameBufferManager bufferManager = new VariableFrameMemoryManager(
+ new VariableFramePool(ctx, maxSortFrames * ctx.getInitialFrameSize()), freeSlotPolicy);
if (alg == Algorithm.MERGE_SORT) {
- frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
- recordDesc);
+ frameSorter = new FrameSorterMergeSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory,
+ comparatorFactories, recordDesc, outputLimit);
} else {
- frameSorter = new FrameSorterQuickSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
- recordDesc);
+ frameSorter = new FrameSorterQuickSort(ctx, bufferManager, sortFields, firstKeyNormalizerFactory,
+ comparatorFactories, recordDesc, outputLimit);
}
- runs = new LinkedList<IFrameReader>();
- maxSortFrames = framesLimit - 1;
- }
-
- @Override
- public void open() throws HyracksDataException {
- runs.clear();
- frameSorter.reset();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- if (frameSorter.getFrameCount() >= maxSortFrames) {
+ if (!frameSorter.insertFrame(buffer)) {
flushFramesToRun();
- }
- frameSorter.insertFrame(buffer);
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (frameSorter.getFrameCount() > 0) {
- if (runs.size() <= 0) {
- frameSorter.sortFrames();
- } else {
- flushFramesToRun();
+ if (!frameSorter.insertFrame(buffer)) {
+ throw new HyracksDataException("The given frame is too big to insert into the sorting memory.");
}
}
}
- private void flushFramesToRun() throws HyracksDataException {
- frameSorter.sortFrames();
+ protected RunFileWriter getRunFileWriter() throws HyracksDataException {
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
ExternalSortRunGenerator.class.getSimpleName());
- RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- try {
- frameSorter.flushFrames(writer);
- } finally {
- writer.close();
- }
- frameSorter.reset();
- runs.add(writer.createReader());
+ return new RunFileWriter(file, ctx.getIOManager());
}
- @Override
- public void fail() throws HyracksDataException {
+ protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException {
+ return writer;
}
- public IFrameSorter getFrameSorter() {
+ @Override
+ public ISorter getSorter() {
return frameSorter;
}
- public List<IFrameReader> getRuns() {
- return runs;
- }
}
\ No newline at end of file