You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:22 UTC
[05/14] incubator-asterixdb-hyracks git commit:
VariableSizeFrame(VSizeFrame) support for Hyracks.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
new file mode 100644
index 0000000..8f8518c
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.ITupleBufferAccessor;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.ITupleBufferManager;
+import edu.uci.ics.hyracks.dataflow.std.structures.IResetableComparable;
+import edu.uci.ics.hyracks.dataflow.std.structures.IResetableComparableFactory;
+import edu.uci.ics.hyracks.dataflow.std.structures.MaxHeap;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class TupleSorterHeapSort implements ITupleSorter {
+
+ private static final Logger LOGGER = Logger.getLogger(TupleSorterHeapSort.class.getName());
+
+ class HeapEntryFactory implements IResetableComparableFactory<HeapEntry> {
+ @Override
+ public IResetableComparable<HeapEntry> createResetableComparable() {
+ return new HeapEntry();
+ }
+ }
+
+ class HeapEntry implements IResetableComparable<HeapEntry> {
+ int nmk;
+ TuplePointer tuplePointer;
+
+ public HeapEntry() {
+ tuplePointer = new TuplePointer();
+ nmk = 0;
+ }
+
+ @Override
+ public int compareTo(HeapEntry o) {
+ if (nmk != o.nmk) {
+ return ((((long) nmk) & 0xffffffffL) < (((long) o.nmk) & 0xffffffffL)) ? -1 : 1;
+ }
+ bufferAccessor1.reset(tuplePointer);
+ bufferAccessor2.reset(o.tuplePointer);
+ byte[] b1 = bufferAccessor1.getTupleBuffer().array();
+ byte[] b2 = bufferAccessor2.getTupleBuffer().array();
+
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int s1 = bufferAccessor1.getAbsFieldStartOffset(fIdx);
+ int l1 = bufferAccessor1.getFieldLength(fIdx);
+
+ int s2 = bufferAccessor2.getAbsFieldStartOffset(fIdx);
+ int l2 = bufferAccessor2.getFieldLength(fIdx);
+ int c;
+ try {
+ c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ public void reset(int nmkey) {
+ nmk = nmkey;
+ }
+
+ @Override
+ public void reset(HeapEntry other) {
+ nmk = other.nmk;
+ tuplePointer.reset(other.tuplePointer);
+ }
+ }
+
+ private final ITupleBufferManager bufferManager;
+ private final ITupleBufferAccessor bufferAccessor1;
+ private final ITupleBufferAccessor bufferAccessor2;
+ private final int topK;
+ private final FrameTupleAppender outputAppender;
+ private final IFrame outputFrame;
+ private final int[] sortFields;
+ private final INormalizedKeyComputer nkc;
+ private final IBinaryComparator[] comparators;
+
+ private HeapEntry maxEntry;
+ private HeapEntry newEntry;
+
+ private MaxHeap heap;
+ private boolean isSorted;
+
+ public TupleSorterHeapSort(IHyracksTaskContext ctx, ITupleBufferManager bufferManager, int topK, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories)
+ throws HyracksDataException {
+ this.bufferManager = bufferManager;
+ this.bufferAccessor1 = bufferManager.getTupleAccessor();
+ this.bufferAccessor2 = bufferManager.getTupleAccessor();
+ this.topK = topK;
+ this.outputFrame = new VSizeFrame(ctx);
+ this.outputAppender = new FrameTupleAppender();
+ this.sortFields = sortFields;
+ this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ this.comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+
+ this.heap = new MaxHeap(new HeapEntryFactory(), topK);
+ this.maxEntry = new HeapEntry();
+ this.newEntry = new HeapEntry();
+ this.isSorted = false;
+ }
+
+ @Override
+ public int getTupleCount() {
+ return heap.getNumEntries();
+ }
+
+ @Override
+ public boolean insertTuple(IFrameTupleAccessor frameTupleAccessor, int index) throws HyracksDataException {
+ if (isSorted) {
+ throw new HyracksDataException(
+ "The Heap haven't be reset after sorting, the order of using this class is not correct.");
+ }
+ int nmkey = getPNK(frameTupleAccessor, index);
+ if (heap.getNumEntries() >= topK) {
+ heap.peekMax(maxEntry);
+ if (compareTuple(frameTupleAccessor, index, nmkey, maxEntry) >= 0) {
+ return true;
+ }
+ }
+
+ newEntry.reset(nmkey);
+ if (!bufferManager.insertTuple(frameTupleAccessor, index, newEntry.tuplePointer)) {
+ return false;
+ }
+ if (heap.getNumEntries() < topK) {
+ heap.insert(newEntry);
+ } else {
+ bufferManager.deleteTuple(maxEntry.tuplePointer);
+ heap.replaceMax(newEntry);
+ }
+ return true;
+ }
+
+ private int getPNK(IFrameTupleAccessor fta, int tIx) {
+ if (nkc == null) {
+ return 0;
+ }
+ int sfIdx = sortFields[0];
+ return nkc.normalize(fta.getBuffer().array(), fta.getAbsoluteFieldStartOffset(tIx, sfIdx),
+ fta.getFieldLength(tIx, sfIdx));
+ }
+
+ private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int nmkey, HeapEntry maxEntry)
+ throws HyracksDataException {
+ if (nmkey != maxEntry.nmk) {
+ return ((((long) nmkey) & 0xffffffffL) < (((long) maxEntry.nmk) & 0xffffffffL)) ? -1 : 1;
+ }
+ bufferAccessor2.reset(maxEntry.tuplePointer);
+ byte[] b1 = frameTupleAccessor.getBuffer().array();
+ byte[] b2 = bufferAccessor2.getTupleBuffer().array();
+
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int s1 = frameTupleAccessor.getAbsoluteFieldStartOffset(tid, fIdx);
+ int l1 = frameTupleAccessor.getFieldLength(tid, fIdx);
+
+ int s2 = bufferAccessor2.getAbsFieldStartOffset(fIdx);
+ int l2 = bufferAccessor2.getFieldLength(fIdx);
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean hasRemaining() {
+ return getTupleCount() > 0;
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ bufferManager.reset();
+ heap.reset();
+ isSorted = false;
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public void sort() throws HyracksDataException {
+ IResetableComparable[] entries = heap.getEntries();
+ int count = heap.getNumEntries();
+ Arrays.sort(entries, 0, count, entryComparator);
+ isSorted = true;
+ }
+
+ private static final Comparator<IResetableComparable> entryComparator = new Comparator<IResetableComparable>() {
+ @Override
+ public int compare(IResetableComparable o1, IResetableComparable o2) {
+ return o1.compareTo(o2);
+ }
+ };
+
+ @Override
+ public void close() {
+ heap = null;
+ bufferManager.close();
+ isSorted = false;
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public int flush(IFrameWriter writer) throws HyracksDataException {
+ outputAppender.reset(outputFrame, true);
+ int maxFrameSize = outputFrame.getFrameSize();
+ int numEntries = heap.getNumEntries();
+ IResetableComparable[] entries = heap.getEntries();
+ int io = 0;
+ for (int i = 0; i < numEntries; i++) {
+ HeapEntry minEntry = (HeapEntry) entries[i];
+ bufferAccessor1.reset(minEntry.tuplePointer);
+ int flushed = FrameUtils
+ .appendToWriter(writer, outputAppender, bufferAccessor1.getTupleBuffer().array(),
+ bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength());
+ if (flushed > 0) {
+ maxFrameSize = Math.max(maxFrameSize, flushed);
+ io++;
+ }
+ }
+ maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
+ outputAppender.flush(writer, true);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(
+ "Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");
+ }
+ return maxFrameSize;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/EnumFreeSlotPolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/EnumFreeSlotPolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/EnumFreeSlotPolicy.java
new file mode 100644
index 0000000..26da494
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/EnumFreeSlotPolicy.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+public enum EnumFreeSlotPolicy {
+ SMALLEST_FIT,
+ LAST_FIT,
+ BIGGEST_FIT,
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirst.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirst.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirst.java
new file mode 100644
index 0000000..085a1e8
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotBiggestFirst.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import edu.uci.ics.hyracks.dataflow.std.structures.IResetableComparable;
+import edu.uci.ics.hyracks.dataflow.std.structures.IResetableComparableFactory;
+import edu.uci.ics.hyracks.dataflow.std.structures.MaxHeap;
+
+public class FrameFreeSlotBiggestFirst implements IFrameFreeSlotPolicy {
+ private static final int INVALID = -1;
+
+ class SpaceEntryFactory implements IResetableComparableFactory {
+ @Override
+ public IResetableComparable createResetableComparable() {
+ return new SpaceEntry();
+ }
+ }
+
+ class SpaceEntry implements IResetableComparable<SpaceEntry> {
+ int space;
+ int id;
+
+ SpaceEntry() {
+ space = INVALID;
+ id = INVALID;
+ }
+
+ @Override
+ public int compareTo(SpaceEntry o) {
+ if (o.space != space) {
+ if (o.space == INVALID) {
+ return 1;
+ }
+ if (space == INVALID) {
+ return -1;
+ }
+ return space < o.space ? -1 : 1;
+ }
+ return 0;
+ }
+
+ @Override
+ public void reset(SpaceEntry other) {
+ space = other.space;
+ id = other.id;
+ }
+
+ void reset(int space, int id) {
+ this.space = space;
+ this.id = id;
+ }
+ }
+
+ private MaxHeap heap;
+ private SpaceEntry tempEntry;
+
+ public FrameFreeSlotBiggestFirst(int initialCapacity) {
+ heap = new MaxHeap(new SpaceEntryFactory(), initialCapacity);
+ tempEntry = new SpaceEntry();
+ }
+
+ @Override
+ public int popBestFit(int tobeInsertedSize) {
+ if (!heap.isEmpty()) {
+ heap.peekMax(tempEntry);
+ if (tempEntry.space >= tobeInsertedSize) {
+ heap.getMax(tempEntry);
+ return tempEntry.id;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public void pushNewFrame(int frameID, int freeSpace) {
+ tempEntry.reset(freeSpace, frameID);
+ heap.insert(tempEntry);
+ }
+
+ @Override
+ public void reset() {
+ heap.reset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFit.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFit.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFit.java
new file mode 100644
index 0000000..0bfcf38
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotLastFit.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.util.Arrays;
+
+public class FrameFreeSlotLastFit implements IFrameFreeSlotPolicy {
+ private static int INITIAL_CAPACITY = 10;
+
+ private class FrameSpace {
+ int frameId;
+ int freeSpace;
+
+ FrameSpace(int frameId, int freeSpace) {
+ reset(frameId, freeSpace);
+ }
+
+ void reset(int frameId, int freeSpace) {
+ this.frameId = frameId;
+ this.freeSpace = freeSpace;
+ }
+ }
+
+ private FrameSpace[] frameSpaces;
+ private int size;
+
+ public FrameFreeSlotLastFit(int maxFrames) {
+ frameSpaces = new FrameSpace[maxFrames];
+ size = 0;
+ }
+
+ public FrameFreeSlotLastFit() {
+ this(INITIAL_CAPACITY);
+ }
+
+ @Override
+ public int popBestFit(int tobeInsertedSize) {
+ for (int i = size - 1; i >= 0; i--) {
+ if (frameSpaces[i].freeSpace >= tobeInsertedSize) {
+ FrameSpace ret = frameSpaces[i];
+ System.arraycopy(frameSpaces, i + 1, frameSpaces, i, size - i - 1);
+ frameSpaces[--size] = ret;
+ return ret.frameId;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public void pushNewFrame(int frameID, int freeSpace) {
+ if (size >= frameSpaces.length) {
+ frameSpaces = Arrays.copyOf(frameSpaces, size * 2);
+ }
+ if (frameSpaces[size] == null) {
+ frameSpaces[size++] = new FrameSpace(frameID, freeSpace);
+ } else {
+ frameSpaces[size++].reset(frameID, freeSpace);
+ }
+ }
+
+ @Override
+ public void reset() {
+ size = 0;
+ for (int i = frameSpaces.length - 1; i >= 0; i--) {
+ frameSpaces[i] = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotSmallestFit.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotSmallestFit.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotSmallestFit.java
new file mode 100644
index 0000000..69e1911
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/FrameFreeSlotSmallestFit.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class FrameFreeSlotSmallestFit implements IFrameFreeSlotPolicy {
+
+ private TreeMap<Integer, LinkedList<Integer>> freeSpaceIndex;
+
+ public FrameFreeSlotSmallestFit() {
+ freeSpaceIndex = new TreeMap<>();
+ }
+
+ @Override
+ public int popBestFit(int tobeInsertedSize) {
+ Map.Entry<Integer, LinkedList<Integer>> entry = freeSpaceIndex.ceilingEntry(tobeInsertedSize);
+ if (entry == null) {
+ return -1;
+ }
+ int id = entry.getValue().removeFirst();
+ if (entry.getValue().isEmpty()) {
+ freeSpaceIndex.remove(entry.getKey());
+ }
+ return id;
+ }
+
+ @Override
+ public void pushNewFrame(int frameID, int freeSpace) {
+ Map.Entry<Integer, LinkedList<Integer>> entry = freeSpaceIndex.ceilingEntry(freeSpace);
+ if (entry == null || entry.getKey() != freeSpace) {
+ LinkedList<Integer> linkedList = new LinkedList<>();
+ linkedList.add(frameID);
+ freeSpaceIndex.put(freeSpace, linkedList);
+ } else {
+ entry.getValue().add(frameID);
+ }
+ }
+
+ @Override
+ public void reset() {
+ freeSpaceIndex.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
new file mode 100644
index 0000000..9a52efa
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameBufferManager.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameBufferManager {
+
+ /**
+ * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
+ *
+ * @throws edu.uci.ics.hyracks.api.exceptions.HyracksDataException
+ */
+ void reset() throws HyracksDataException;
+
+ /**
+ * @param frameIndex
+ * @return the specified frame, from the set of memory buffers, being
+ * managed by this memory manager
+ */
+ ByteBuffer getFrame(int frameIndex);
+
+ /**
+ * Get the startOffset of the specific frame inside buffer
+ *
+ * @param frameIndex
+ * @return the start offset of the frame returned by {@link #getFrame(int)} method.
+ */
+ int getFrameStartOffset(int frameIndex);
+
+ /**
+ * Get the size of the specific frame inside buffer
+ *
+ * @param frameIndex
+ * @return the length of the specific frame
+ */
+ int getFrameSize(int frameIndex);
+
+ /**
+ * @return the number of frames in this buffer
+ */
+ int getNumFrames();
+
+ /**
+ * Writes the whole frame into the buffer.
+ *
+ * @param frame source frame
+ * @return the id of the inserted frame. if failed to return it will be -1.
+ */
+ int insertFrame(ByteBuffer frame) throws HyracksDataException;
+
+ void close();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
new file mode 100644
index 0000000..57a8094
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFrameFreeSlotPolicy.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+public interface IFrameFreeSlotPolicy {
+
+ /**
+ * Find the best fit frame id which can hold the data, and then pop it out from the index.
+ * Return -1 is failed to find any.
+ *
+ * @param tobeInsertedSize the actual size of the data which should include
+ * the meta data like the field offset and the tuple
+ * count extra size
+ * @return the best fit frame id
+ */
+ int popBestFit(int tobeInsertedSize);
+
+ /**
+ * Register the new free slot into the index
+ *
+ * @param frameID
+ * @param freeSpace
+ */
+ void pushNewFrame(int frameID, int freeSpace);
+
+ /**
+ * Clear all the existing free slot information.
+ */
+ void reset();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
new file mode 100644
index 0000000..1e5be25
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/IFramePool.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFramePool {
+
+ int getMinFrameSize();
+
+ int getMemoryBudgetBytes();
+
+ /**
+ * Get a frame of given size. <br>
+ * Returns {@code null} if failed to allocate the required size of frame
+ *
+ * @param frameSize the actual size of the frame.
+ * @return the allocated frame
+ * @throws HyracksDataException
+ */
+ ByteBuffer allocateFrame(int frameSize) throws HyracksDataException;
+
+ /**
+ * Reset the counters to initial status. This method should not release the pre-allocated resources.
+ */
+ void reset();
+
+ /**
+ * Release the pre-allocated resources.
+ */
+ void close();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
new file mode 100644
index 0000000..49a664c
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferAccessor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public interface ITupleBufferAccessor {
+
+ void reset(TuplePointer tuplePointer);
+
+ ByteBuffer getTupleBuffer();
+
+ int getTupleStartOffset();
+
+ int getTupleLength();
+
+ int getAbsFieldStartOffset(int fieldId);
+
+ int getFieldLength(int fieldId);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
new file mode 100644
index 0000000..6f94563
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/ITupleBufferManager.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public interface ITupleBufferManager {
+ /**
+ * Reset the counters and flags to initial status. This method should not release the pre-allocated resources
+ *
+ * @throws edu.uci.ics.hyracks.api.exceptions.HyracksDataException
+ */
+ void reset() throws HyracksDataException;
+
+ /**
+ * @return the number of tuples in this buffer
+ */
+ int getNumTuples();
+
+ boolean insertTuple(IFrameTupleAccessor accessor, int idx, TuplePointer tuplePointer) throws HyracksDataException;
+
+ void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException;
+
+ void close();
+
+ ITupleBufferAccessor getTupleAccessor();
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
new file mode 100644
index 0000000..834ba2a
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFrameMemoryManager.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.FixedSizeFrame;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class VariableFrameMemoryManager implements IFrameBufferManager {
+
+ private class PhysicalFrameOffset {
+ IFrame physicalFrame;
+ int physicalOffset;
+
+ PhysicalFrameOffset(IFrame frame, int offset) {
+ physicalFrame = frame;
+ physicalOffset = offset;
+ }
+ }
+
+ private class LogicalFrameStartSize {
+ ByteBuffer logicalFrame;
+ int logicalStart;
+ int logicalSize;
+
+ LogicalFrameStartSize(ByteBuffer frame, int start, int size) {
+ logicalFrame = frame;
+ logicalStart = start;
+ logicalSize = size;
+ }
+ }
+
+ private final IFramePool framePool;
+ private List<PhysicalFrameOffset> physicalFrameOffsets;
+ private List<LogicalFrameStartSize> logicalFrameStartSizes;
+ private final IFrameFreeSlotPolicy freeSlotPolicy;
+
+ public VariableFrameMemoryManager(IFramePool framePool, IFrameFreeSlotPolicy freeSlotPolicy) {
+ this.framePool = framePool;
+ this.freeSlotPolicy = freeSlotPolicy;
+ int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize();
+ this.physicalFrameOffsets = new ArrayList<>(maxFrames);
+ this.logicalFrameStartSizes = new ArrayList<>(maxFrames);
+ }
+
+ private int findAvailableFrame(int frameSize) throws HyracksDataException {
+ int frameId = freeSlotPolicy.popBestFit(frameSize);
+ if (frameId >= 0) {
+ return frameId;
+ }
+ ByteBuffer buffer = framePool.allocateFrame(frameSize);
+ if (buffer != null) {
+ IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), 0);
+ physicalFrameOffsets.add(new PhysicalFrameOffset(new FixedSizeFrame(buffer), 0));
+ return physicalFrameOffsets.size() - 1;
+ }
+ return -1;
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ physicalFrameOffsets.clear();
+ logicalFrameStartSizes.clear();
+ freeSlotPolicy.reset();
+ framePool.reset();
+ }
+
+ @Override
+ public ByteBuffer getFrame(int frameIndex) {
+ return logicalFrameStartSizes.get(frameIndex).logicalFrame;
+ }
+
+ @Override
+ public int getFrameStartOffset(int frameIndex) {
+ return logicalFrameStartSizes.get(frameIndex).logicalStart;
+ }
+
+ @Override
+ public int getFrameSize(int frameIndex) {
+ return logicalFrameStartSizes.get(frameIndex).logicalSize;
+ }
+
+ @Override
+ public int getNumFrames() {
+ return logicalFrameStartSizes.size();
+ }
+
+ @Override
+ public int insertFrame(ByteBuffer frame) throws HyracksDataException {
+ int frameSize = frame.capacity();
+ int physicalFrameId = findAvailableFrame(frameSize);
+ if (physicalFrameId < 0) {
+ return -1;
+ }
+ ByteBuffer buffer = physicalFrameOffsets.get(physicalFrameId).physicalFrame.getBuffer();
+ int offset = physicalFrameOffsets.get(physicalFrameId).physicalOffset;
+ System.arraycopy(frame.array(), 0, buffer.array(), offset, frameSize);
+ if (offset + frameSize < buffer.capacity()) {
+ freeSlotPolicy.pushNewFrame(physicalFrameId, buffer.capacity() - offset - frameSize);
+ }
+ physicalFrameOffsets.get(physicalFrameId).physicalOffset = offset + frameSize;
+ logicalFrameStartSizes.add(new LogicalFrameStartSize(buffer, offset, frameSize));
+ return logicalFrameStartSizes.size() - 1;
+ }
+
+ @Override
+ public void close() {
+ physicalFrameOffsets.clear();
+ logicalFrameStartSizes.clear();
+ freeSlotPolicy.reset();
+ framePool.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
new file mode 100644
index 0000000..0d936b4
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableFramePool.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.context.IHyracksFrameMgrContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class VariableFramePool implements IFramePool {
+ public static final int UNLIMITED_MEMORY = -1;
+
+ private final IHyracksFrameMgrContext ctx;
+ private final int minFrameSize;
+ private final int memBudget;
+
+ private int allocateMem;
+ private ArrayList<ByteBuffer> buffers; // the unused slots were sorted by size increasingly.
+ private BitSet used; // the merged one also marked as used.
+
+ /**
+ * The constructor of the VariableFramePool.
+ *
+ * @param ctx
+ * @param memBudgetInBytes the given memory budgets to allocate the frames. If it less than 0, it will be treated as unlimited budgets
+ */
+ public VariableFramePool(IHyracksFrameMgrContext ctx, int memBudgetInBytes) {
+ this.ctx = ctx;
+ this.minFrameSize = ctx.getInitialFrameSize();
+ this.allocateMem = 0;
+ if (memBudgetInBytes == UNLIMITED_MEMORY) {
+ this.memBudget = Integer.MAX_VALUE;
+ this.buffers = new ArrayList<>();
+ this.used = new BitSet();
+ } else {
+ this.memBudget = memBudgetInBytes;
+ this.buffers = new ArrayList<>(memBudgetInBytes / minFrameSize);
+ this.used = new BitSet(memBudgetInBytes / minFrameSize);
+ }
+ }
+
+ @Override
+ public int getMinFrameSize() {
+ return minFrameSize;
+ }
+
+ @Override
+ public int getMemoryBudgetBytes() {
+ return memBudget;
+ }
+
+ @Override
+ public ByteBuffer allocateFrame(int frameSize) throws HyracksDataException {
+ int frameId = findExistingFrame(frameSize);
+ if (frameId >= 0) {
+ return reuseFrame(frameId);
+ }
+ if (haveEnoughFreeSpace(frameSize)) {
+ return createNewFrame(frameSize);
+ }
+ return mergeExistingFrames(frameSize);
+
+ }
+
+ private boolean haveEnoughFreeSpace(int frameSize) {
+ return frameSize + allocateMem <= memBudget;
+ }
+
+ private static int getFirstUnUsedPos(BitSet used) {
+ return used.nextClearBit(0);
+ }
+
+ private static int getLastUnUsedPos(BitSet used, int lastPos) {
+ return used.previousClearBit(lastPos);
+ }
+
+ private static int binarySearchUnusedBuffer(ArrayList<ByteBuffer> buffers, BitSet used, int frameSize) {
+ int l = getFirstUnUsedPos(used); // to skip the merged null buffers
+ int h = getLastUnUsedPos(used, (buffers.size() - 1)) + 1; // to skip the newly created buffers
+ if (l >= h) {
+ return -1;
+ }
+ int highest = h;
+ int mid = (l + h) / 2;
+ while (l < h) {
+ ByteBuffer buffer = buffers.get(mid);
+ if (buffer.capacity() == frameSize) {
+ break;
+ }
+ if (buffer.capacity() < frameSize) {
+ l = mid + 1;
+ } else {
+ h = mid;
+ }
+ mid = (l + h) / 2;
+ }
+ mid = used.nextClearBit(mid);
+ return mid < highest ? mid : -1;
+ }
+
+ private int findExistingFrame(int frameSize) {
+ return binarySearchUnusedBuffer(buffers, used, frameSize);
+ }
+
+ private ByteBuffer reuseFrame(int id) {
+ used.set(id);
+ buffers.get(id).clear();
+ return buffers.get(id);
+ }
+
+ private ByteBuffer createNewFrame(int frameSize) throws HyracksDataException {
+ buffers.add(ctx.allocateFrame(frameSize));
+ allocateMem += frameSize;
+ return reuseFrame(buffers.size() - 1);
+ }
+
+ /**
+ * The merging sequence is from the smallest to the largest order.
+ * Once the buffer get merged, it will be remove from the list in order to free the object.
+ * And the index spot of it will be marked as used.
+ *
+ * @param frameSize
+ * @return
+ * @throws HyracksDataException
+ */
+ private ByteBuffer mergeExistingFrames(int frameSize) throws HyracksDataException {
+ int mergedSize = memBudget - allocateMem;
+ int highBound = getLastUnUsedPos(used, buffers.size() - 1) + 1;
+ for (int i = getFirstUnUsedPos(used); i < highBound; ++i) {
+ if (!used.get(i)) {
+ mergedSize += deAllocateFrame(i);
+ if (mergedSize >= frameSize) {
+ return createNewFrame(mergedSize);
+ }
+ }
+ }
+ return null;
+ }
+
+ private int deAllocateFrame(int id) {
+ ByteBuffer frame = buffers.get(id);
+ ctx.deallocateFrames(frame.capacity());
+ buffers.set(id, null);
+ used.set(id);
+ allocateMem -= frame.capacity();
+ return frame.capacity();
+ }
+
+ @Override
+ public void reset() {
+ removeEmptySpot(buffers);
+ Collections.sort(buffers, sizeByteBufferComparator);
+ used.clear();
+ }
+
+ private static void removeEmptySpot(List<ByteBuffer> buffers) {
+ for (int i = 0; i < buffers.size(); ) {
+ if (buffers.get(i) == null) {
+ buffers.remove(i);
+ } else {
+ i++;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ buffers.clear();
+ used.clear();
+ allocateMem = 0;
+ }
+
+ private static Comparator<ByteBuffer> sizeByteBufferComparator = new Comparator<ByteBuffer>() {
+ @Override
+ public int compare(ByteBuffer o1, ByteBuffer o2) {
+ if (o1.capacity() == o2.capacity()) {
+ return 0;
+ }
+ return o1.capacity() < o2.capacity() ? -1 : 1;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
new file mode 100644
index 0000000..0b077c2
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/buffermanager/VariableTupleMemoryManager.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.IAppendDeletableFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class VariableTupleMemoryManager implements ITupleBufferManager {
+
+ private final static Logger LOG = Logger.getLogger(VariableTupleMemoryManager.class.getName());
+
+ private final int MIN_FREE_SPACE;
+ private final IFramePool pool;
+ private final IFrameFreeSlotPolicy policy;
+ private final IAppendDeletableFrameTupleAccessor accessor;
+ private final ArrayList<ByteBuffer> frames;
+ private final RecordDescriptor recordDescriptor;
+ private int numTuples;
+ private int statsReOrg;
+
+ public VariableTupleMemoryManager(IFramePool framePool, RecordDescriptor recordDescriptor) {
+ this.pool = framePool;
+ int maxFrames = framePool.getMemoryBudgetBytes() / framePool.getMinFrameSize();
+ this.policy = new FrameFreeSlotLastFit(maxFrames);
+ this.accessor = new DeletableFrameTupleAppender(recordDescriptor);
+ this.frames = new ArrayList<>();
+ this.MIN_FREE_SPACE = calculateMinFreeSpace(recordDescriptor);
+ this.recordDescriptor = recordDescriptor;
+ this.numTuples = 0;
+ this.statsReOrg = 0;
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ pool.reset();
+ policy.reset();
+ frames.clear();
+ numTuples = 0;
+ }
+
+ @Override
+ public int getNumTuples() {
+ return numTuples;
+ }
+
+ @Override
+ public boolean insertTuple(IFrameTupleAccessor fta, int idx, TuplePointer tuplePointer)
+ throws HyracksDataException {
+ int requiredFreeSpace = calculatePhysicalSpace(fta, idx);
+ int frameId = findAvailableFrame(requiredFreeSpace);
+ if (frameId < 0) {
+ if (canBeInsertedAfterCleanUpFragmentation(requiredFreeSpace)) {
+ reOrganizeFrames();
+ frameId = findAvailableFrame(requiredFreeSpace);
+ statsReOrg++;
+ } else {
+ return false;
+ }
+ }
+ assert frameId >= 0;
+ accessor.reset(frames.get(frameId));
+ assert accessor.getContiguousFreeSpace() >= requiredFreeSpace;
+ int tid = accessor.append(fta, idx);
+ assert tid >= 0;
+ tuplePointer.reset(frameId, tid);
+ if (accessor.getContiguousFreeSpace() > MIN_FREE_SPACE) {
+ policy.pushNewFrame(frameId, accessor.getContiguousFreeSpace());
+ }
+ numTuples++;
+ return true;
+ }
+
+ private void reOrganizeFrames() {
+ policy.reset();
+ for (int i = 0; i < frames.size(); i++) {
+ accessor.reset(frames.get(i));
+ accessor.reOrganizeBuffer();
+ policy.pushNewFrame(i, accessor.getContiguousFreeSpace());
+ }
+ }
+
+ private boolean canBeInsertedAfterCleanUpFragmentation(int requiredFreeSpace) {
+ for (int i = 0; i < frames.size(); i++) {
+ accessor.reset(frames.get(i));
+ if (accessor.getTotalFreeSpace() >= requiredFreeSpace) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private int findAvailableFrame(int requiredFreeSpace) throws HyracksDataException {
+ int frameId = policy.popBestFit(requiredFreeSpace);
+ if (frameId >= 0) {
+ return frameId;
+ }
+
+ int frameSize = calculateMinFrameSizeToPlaceTuple(requiredFreeSpace, pool.getMinFrameSize());
+ ByteBuffer buffer = pool.allocateFrame(frameSize);
+ if (buffer != null) {
+ accessor.clear(buffer);
+ frames.add(buffer);
+ return frames.size() - 1;
+ }
+ return -1;
+ }
+
+ private static int calculateMinFrameSizeToPlaceTuple(int requiredFreeSpace, int minFrameSize) {
+ return (1 + (requiredFreeSpace + 4 - 1) / minFrameSize) * minFrameSize;
+ }
+
+ private static int calculatePhysicalSpace(IFrameTupleAccessor fta, int idx) {
+ // 4 bytes to store the offset
+ return 4 + fta.getTupleLength(idx);
+ }
+
+ private static int calculateMinFreeSpace(RecordDescriptor recordDescriptor) {
+ // + 4 for the tuple offset
+ return recordDescriptor.getFieldCount() * 4 + 4;
+ }
+
+ @Override
+ public void deleteTuple(TuplePointer tuplePointer) throws HyracksDataException {
+ accessor.reset(frames.get(tuplePointer.frameIndex));
+ accessor.delete(tuplePointer.tupleIndex);
+ numTuples--;
+ }
+
+ @Override
+ public void close() {
+ pool.close();
+ policy.reset();
+ frames.clear();
+ numTuples = 0;
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("VariableTupleMemoryManager has reorganized " + statsReOrg + " times");
+ }
+ statsReOrg = 0;
+ }
+
+ @Override
+ public ITupleBufferAccessor getTupleAccessor() {
+ return new ITupleBufferAccessor() {
+ private IAppendDeletableFrameTupleAccessor bufferAccessor = new DeletableFrameTupleAppender(
+ recordDescriptor);
+ private int tid;
+
+ @Override
+ public void reset(TuplePointer tuplePointer) {
+ bufferAccessor.reset(frames.get(tuplePointer.frameIndex));
+ tid = tuplePointer.tupleIndex;
+ }
+
+ @Override
+ public ByteBuffer getTupleBuffer() {
+ return bufferAccessor.getBuffer();
+ }
+
+ @Override
+ public int getTupleStartOffset() {
+ return bufferAccessor.getTupleStartOffset(tid);
+ }
+
+ @Override
+ public int getTupleLength() {
+ return bufferAccessor.getTupleLength(tid);
+ }
+
+ @Override
+ public int getAbsFieldStartOffset(int fieldId) {
+ return bufferAccessor.getAbsoluteFieldStartOffset(tid, fieldId);
+ }
+
+ @Override
+ public int getFieldLength(int fieldId) {
+ return bufferAccessor.getFieldLength(tid, fieldId);
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
new file mode 100644
index 0000000..9b03a77
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.util;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
+
+/**
+ * This is a special frame which is used in TupleMemoryBuffer.
+ * This frame has a special structure to organize the deleted spaces.
+ * Specifically, the endOffset of the deleted tuple will be set as negative number.
+ * And we add a special <code>deleted_space</code> field at the last 4 bytes to remember how many bytes has been deleted.
+ */
+public class DeletableFrameTupleAppender implements IAppendDeletableFrameTupleAccessor {
+
+ private static final int SIZE_DELETED_SPACE = 4;
+ private final RecordDescriptor recordDescriptor;
+ private ByteBuffer buffer;
+ private int tupleCountOffset;
+ private int tupleCount;
+ private int freeDataEndOffset;
+ private int deletedSpace;
+ private byte[] array; // to speed up the array visit a little
+
+ public DeletableFrameTupleAppender(RecordDescriptor recordDescriptor) {
+ this.recordDescriptor = recordDescriptor;
+ }
+
+ private int getTupleCountOffset() {
+ return FrameHelper.getTupleCountOffset(buffer.capacity()) - SIZE_DELETED_SPACE;
+ }
+
+ private int getFreeDataEndOffset() {
+ return tupleCount == 0 ? 0 : Math.abs(IntSerDeUtils.getInt(array, tupleCountOffset - tupleCount * 4));
+ }
+
+ private void setFreeDataEndOffset(int offset) {
+ assert (offset >= 0);
+ IntSerDeUtils.putInt(array, tupleCountOffset - tupleCount * 4, offset);
+ }
+
+ private void setTupleCount(int count) {
+ IntSerDeUtils.putInt(array, tupleCountOffset, count);
+ }
+
+ private void setDeleteSpace(int count) {
+ IntSerDeUtils.putInt(array, buffer.capacity() - SIZE_DELETED_SPACE, count);
+ }
+
+ private int getPhysicalTupleCount() {
+ return IntSerDeUtils.getInt(array, tupleCountOffset);
+ }
+
+ private int getDeletedSpace() {
+ return IntSerDeUtils.getInt(array, buffer.capacity() - SIZE_DELETED_SPACE);
+ }
+
+ @Override
+ public void clear(ByteBuffer buffer) throws HyracksDataException {
+ this.buffer = buffer;
+ this.array = buffer.array();
+ tupleCountOffset = getTupleCountOffset();
+ setTupleCount(0);
+ setDeleteSpace(0);
+ resetCounts();
+ }
+
+ @Override
+ public void reset(ByteBuffer buffer) {
+ this.buffer = buffer;
+ this.array = buffer.array();
+ tupleCountOffset = getTupleCountOffset();
+ resetCounts();
+ }
+
+ private void resetCounts() {
+ deletedSpace = getDeletedSpace();
+ tupleCount = getPhysicalTupleCount();
+ freeDataEndOffset = getFreeDataEndOffset();
+ }
+
+ /**
+ * Append the record into the frame. This method will not validate the space, please make sure space is enough
+ * by calling {@link #getContiguousFreeSpace()}
+ *
+ * @param tupleAccessor
+ * @param tIndex
+ * @return
+ * @throws HyracksDataException
+ */
+ @Override
+ public int append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
+ byte[] src = tupleAccessor.getBuffer().array();
+ int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
+ int length = tupleAccessor.getTupleLength(tIndex);
+ System.arraycopy(src, tStartOffset, array, freeDataEndOffset, length);
+ setTupleCount(++tupleCount);
+ freeDataEndOffset += length;
+ setFreeDataEndOffset(freeDataEndOffset);
+ return tupleCount - 1;
+ }
+
+ @Override
+ public void delete(int tupleIndex) {
+ int endOffset = getTupleEndOffset(tupleIndex);
+ if (endOffset > 0) {
+ setTupleEndOffset(tupleIndex, -endOffset);
+ deletedSpace += endOffset - getTupleStartOffset(tupleIndex);
+ setDeleteSpace(deletedSpace);
+ }
+ }
+
+ @Override
+ public void reOrganizeBuffer() {
+ if (deletedSpace <= 0) {
+ return;
+ }
+ reclaimDeletedEnding();
+
+ freeDataEndOffset = 0;
+ int endOffset = 0;
+ for (int i = 0; i < tupleCount; i++) {
+ int startOffset = Math.abs(endOffset);
+ endOffset = getTupleEndOffset(i);
+ if (endOffset >= 0) {
+ int length = endOffset - startOffset;
+ assert ( length >= 0);
+ if (freeDataEndOffset != startOffset) {
+ System.arraycopy(array, startOffset, array, freeDataEndOffset, length);
+ }
+ freeDataEndOffset += length;
+ }
+ setTupleEndOffset(i, freeDataEndOffset);
+ }
+ setFreeDataEndOffset(freeDataEndOffset);
+ deletedSpace = 0;
+ setDeleteSpace(0);
+ }
+
+ private void reclaimDeletedEnding() {
+ for (int i = tupleCount - 1; i >= 0; i--) {
+ int endOffset = getTupleEndOffset(i);
+ if (endOffset < 0) {
+ tupleCount--;
+ } else {
+ break;
+ }
+ }
+ setTupleCount(tupleCount);
+ }
+
+ @Override
+ public int getTotalFreeSpace() {
+ return getContiguousFreeSpace() + deletedSpace;
+ }
+
+ @Override
+ public int getContiguousFreeSpace() {
+ return getTupleCountOffset() - tupleCount * 4 - freeDataEndOffset;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return recordDescriptor.getFieldCount();
+ }
+
+ @Override
+ public int getFieldSlotsLength() {
+ return recordDescriptor.getFieldCount() * 4;
+ }
+
+ @Override
+ public int getFieldEndOffset(int tupleIndex, int fIdx) {
+ return IntSerDeUtils.getInt(array, getTupleStartOffset(tupleIndex) + fIdx * 4);
+ }
+
+ @Override
+ public int getFieldStartOffset(int tupleIndex, int fIdx) {
+ return fIdx == 0 ? 0 : IntSerDeUtils.getInt(array, getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+ }
+
+ @Override
+ public int getFieldLength(int tupleIndex, int fIdx) {
+ return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getTupleLength(int tupleIndex) {
+ int endOffset = getTupleEndOffset(tupleIndex);
+ if (endOffset < 0) {
+ return endOffset + getTupleStartOffset(tupleIndex);
+ }
+ return endOffset - getTupleStartOffset(tupleIndex);
+ }
+
+ @Override
+ public int getTupleEndOffset(int tupleIndex) {
+ return IntSerDeUtils.getInt(array, tupleCountOffset - 4 * (tupleIndex + 1));
+ }
+
+ private void setTupleEndOffset(int tupleIndex, int offset) {
+ IntSerDeUtils.putInt(array, tupleCountOffset - 4 * (tupleIndex + 1), offset);
+ }
+
+ @Override
+ public int getTupleStartOffset(int tupleIndex) {
+ int offset = tupleIndex == 0 ? 0 : IntSerDeUtils.getInt(array, tupleCountOffset - 4 * tupleIndex);
+ return Math.abs(offset);
+ }
+
+ @Override
+ public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+ return getTupleStartOffset(tupleIndex) + getFieldSlotsLength() + getFieldStartOffset(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getTupleCount() {
+ return tupleCount;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupFrameAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupFrameAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupFrameAccessor.java
new file mode 100644
index 0000000..12ba72f
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupFrameAccessor.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * This {@code GroupFrameAccessor} access a group of logical frames which are stored in one physical
+ * continuous ByteBuffer. It is used in a RunFileReader which can read several frames at once, and we
+ * can use this accessor to parse the returned data as one frame. In the caller's view there is only
+ * one frame which simply the caller's work.
+ */
+public class GroupFrameAccessor implements IFrameTupleAccessor {
+
+ private class InnerFrameInfo implements Comparable<Integer> {
+ int start;
+ int length;
+ int tupleCount;
+
+ InnerFrameInfo(int start, int length, int tupleCount) {
+ this.start = start;
+ this.length = length;
+ this.tupleCount = tupleCount;
+ }
+
+ @Override
+ public int compareTo(Integer o) {
+ return -o.compareTo(tupleCount);
+ }
+ }
+
+ private final RecordDescriptor recordDescriptor;
+ private final int minFrameSize;
+ private final FrameTupleAccessor frameTupleAccessor;
+ private int lastTupleIndex;
+ private int lastFrameId;
+ private ByteBuffer buffer;
+ private List<InnerFrameInfo> innerFrameInfos;
+
+ public GroupFrameAccessor(int minFrameSize, RecordDescriptor recordDescriptor) {
+ this.minFrameSize = minFrameSize;
+ this.recordDescriptor = (recordDescriptor);
+ this.frameTupleAccessor = new FrameTupleAccessor(recordDescriptor);
+ this.innerFrameInfos = new ArrayList<>();
+ }
+
+ @Override
+ public int getFieldCount() {
+ return recordDescriptor.getFieldCount();
+ }
+
+ @Override
+ public int getFieldSlotsLength() {
+ return frameTupleAccessor.getFieldSlotsLength();
+ }
+
+ @Override
+ public int getFieldEndOffset(int tupleIndex, int fIdx) {
+ return frameTupleAccessor.getFieldEndOffset(resetSubTupleAccessor(tupleIndex), fIdx);
+ }
+
+ @Override
+ public int getFieldStartOffset(int tupleIndex, int fIdx) {
+ return frameTupleAccessor.getFieldStartOffset(resetSubTupleAccessor(tupleIndex), fIdx);
+ }
+
+ @Override
+ public int getFieldLength(int tupleIndex, int fIdx) {
+ return frameTupleAccessor.getFieldLength(resetSubTupleAccessor(tupleIndex), fIdx);
+ }
+
+ @Override
+ public int getTupleLength(int tupleIndex) {
+ return frameTupleAccessor.getTupleLength(resetSubTupleAccessor(tupleIndex));
+ }
+
+ @Override
+ public int getTupleEndOffset(int tupleIndex) {
+ return frameTupleAccessor.getTupleEndOffset(resetSubTupleAccessor(tupleIndex));
+ }
+
+ @Override
+ public int getTupleStartOffset(int tupleIndex) {
+ return frameTupleAccessor.getTupleStartOffset(resetSubTupleAccessor(tupleIndex));
+ }
+
+ @Override
+ public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+ return frameTupleAccessor.getAbsoluteFieldStartOffset(resetSubTupleAccessor(tupleIndex), fIdx);
+ }
+
+ @Override
+ public int getTupleCount() {
+ return innerFrameInfos.size() > 0 ? innerFrameInfos.get(innerFrameInfos.size() - 1).tupleCount : 0;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public void reset(ByteBuffer buffer) {
+ this.buffer = buffer;
+ this.lastTupleIndex = -1;
+ this.lastFrameId = -1;
+ parseGroupedBuffer(0, buffer.limit());
+ }
+
+ private void parseGroupedBuffer(int start, int stop) {
+ this.innerFrameInfos.clear();
+ int i = start;
+ while (i < stop) {
+ int unitSize = FrameHelper.deserializeNumOfMinFrame(buffer, i) * minFrameSize;
+ if (unitSize == 0) { // run consumed.
+ break;
+ }
+ if (i + unitSize > stop) { // contains future partial run, stop here
+ break;
+ }
+ frameTupleAccessor.reset(buffer, i, unitSize);
+ this.innerFrameInfos
+ .add(new InnerFrameInfo(i, unitSize, getTupleCount() + frameTupleAccessor.getTupleCount()));
+ i += unitSize;
+ }
+ buffer.position(i); // reading stops here.
+ }
+
+ private int resetSubTupleAccessor(int tupleIndex) {
+ assert tupleIndex < getTupleCount();
+ if (innerFrameInfos.size() == 1) {
+ return tupleIndex;
+ }
+ if (tupleIndex == lastTupleIndex) {
+ return lastFrameId > 0 ? lastTupleIndex - innerFrameInfos.get(lastFrameId - 1).tupleCount : lastTupleIndex;
+ }
+ int subFrameId = Collections.binarySearch(innerFrameInfos, tupleIndex);
+ if (subFrameId >= 0) {
+ subFrameId++;
+ } else {
+ subFrameId = -subFrameId - 1;
+ }
+ frameTupleAccessor.reset(buffer, innerFrameInfos.get(subFrameId).start, innerFrameInfos.get(subFrameId).length);
+ lastTupleIndex = tupleIndex;
+ lastFrameId = subFrameId;
+ return lastFrameId > 0 ? lastTupleIndex - innerFrameInfos.get(lastFrameId - 1).tupleCount : lastTupleIndex;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupVSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupVSizeFrame.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupVSizeFrame.java
new file mode 100644
index 0000000..b273f5b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/GroupVSizeFrame.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.util;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class GroupVSizeFrame extends VSizeFrame {
+
+ public GroupVSizeFrame(IHyracksCommonContext ctx, int frameSize)
+ throws HyracksDataException {
+ super(ctx, frameSize);
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ if (buffer.position() > 0 && buffer.hasRemaining()) {
+ movePartialFutureToStartPosition();
+ } else {
+ buffer.clear();
+ }
+ }
+
+ private void movePartialFutureToStartPosition() {
+ assert buffer.hasArray();
+ if (!FrameHelper.hasBeenCleared(buffer, buffer.position())) {
+ buffer.compact();
+ FrameHelper.clearRemainingFrame(buffer, buffer.position()); // mark it to make reset idempotent
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
new file mode 100644
index 0000000..01744f9
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort.util;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Basically it a union of the {@link IFrameTupleAccessor} and {@link IFrameTupleAppender}.
+ * Moreover, it has the delete function as well.
+ * This is a special TupleAccessor used for TopK sorting.
+ * In HeapSort, or other Tuple-based operators, we need to append the tuple, access the arbitrary previously
+ * inserted tuple, and delete the previously inserted tuple.
+ */
+public interface IAppendDeletableFrameTupleAccessor extends IFrameTupleAccessor {
+
+ /**
+ * Prepare to write on this buffer
+ *
+ * @param buffer
+ * @throws HyracksDataException
+ */
+ void clear(ByteBuffer buffer) throws HyracksDataException;
+
+ /**
+ * Append tuple content to this buffer. Return the new tid as a handle to the caller.
+ *
+ * @param tupleAccessor
+ * @param tIndex
+ * @return
+ * @throws HyracksDataException
+ */
+ int append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException;
+
+ /**
+ * Remove the certain tuple by tid
+ *
+ * @param tid
+ */
+ void delete(int tid);
+
+ /**
+ * Reorganize the space to remove the unused space and make the free space contiguous.
+ */
+ void reOrganizeBuffer();
+
+ /**
+ * @return how many free space in total in the buffer, including the fragmented space
+ */
+ int getTotalFreeSpace();
+
+ /**
+ * @return how many contiguous free space in the buffer.
+ */
+ int getContiguousFreeSpace();
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstractHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstractHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstractHeap.java
new file mode 100644
index 0000000..5d8b252
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/AbstractHeap.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.dataflow.std.util.MathUtil;
+
+public abstract class AbstractHeap implements IHeap<IResetableComparable> {
+ protected static final int NOT_EXIST = -1;
+ protected static final int MAX_INITIAL_CAPACITY = 1024;
+ protected IResetableComparable[] entries;
+ protected IResetableComparable tempEntry;
+ protected IResetableComparableFactory factory;
+ protected int numEntry;
+
+ public AbstractHeap(IResetableComparableFactory factory, int capacity) {
+ capacity = Math.min(MAX_INITIAL_CAPACITY, Math.max(1, capacity));
+ this.entries = new IResetableComparable[capacity];
+ this.numEntry = 0;
+ this.tempEntry = factory.createResetableComparable();
+ this.factory = factory;
+ }
+
+ @Override
+ public void insert(IResetableComparable element) {
+ if (numEntry >= entries.length) {
+ entries = Arrays.copyOf(entries, entries.length * 2);
+ }
+ if (entries[numEntry] == null) {
+ entries[numEntry] = factory.createResetableComparable();
+ }
+ entries[numEntry++].reset(element);
+ bubbleUp(numEntry - 1);
+ }
+
+ protected abstract void bubbleUp(int i);
+
+ protected abstract void trickleDown(int i);
+
+ protected void swap(int cid, int pid) {
+ tempEntry.reset(entries[cid]);
+ entries[cid].reset(entries[pid]);
+ entries[pid].reset(tempEntry);
+ }
+
+ protected int compareTo(int i, int j) {
+ return entries[i].compareTo(entries[j]);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return numEntry == 0;
+ }
+
+ @Override
+ public void reset() {
+ for (int i = 0; i < numEntry; i++) {
+ entries[i] = null;
+ }
+ numEntry = 0;
+ }
+
+ /**
+ * By getting the entries it can manipulate the entries which may violate the Heap property.
+ * Use with care.
+ *
+ * @return
+ */
+ @Deprecated
+ public IResetableComparable[] getEntries() {
+ return entries;
+ }
+
+ @Override
+ public int getNumEntries() {
+ return numEntry;
+ }
+
+ protected int getLevel(int cid) {
+ return MathUtil.log2Floor(cid + 1);
+ }
+
+ static int getParentId(int cid) {
+ return cid < 1 ? NOT_EXIST : (cid - 1) / 2;
+ }
+
+ static int getLeftChild(int id, int numEntry) {
+ int cid = id * 2 + 1;
+ return cid >= numEntry ? NOT_EXIST : cid;
+ }
+
+ protected int getLeftChild(int id) {
+ return getLeftChild(id, numEntry);
+ }
+
+ static int getRightChild(int id, int numEntry) {
+ int cid = id * 2 + 2;
+ return cid >= numEntry ? NOT_EXIST : cid;
+ }
+
+ protected int getRightChild(int id) {
+ return getRightChild(id, numEntry);
+ }
+
+ protected int getGrandParentId(int id) {
+ int pid = getParentId(id);
+ return pid == NOT_EXIST ? NOT_EXIST : getParentId(pid);
+ }
+
+ protected boolean isDirectChild(int id, int childId) {
+ return id == getParentId(childId);
+ }
+
+ protected int getMinChild(int id) {
+ int min = NOT_EXIST;
+ if (id != NOT_EXIST) {
+ min = getLeftChild(id, numEntry);
+ if (min != NOT_EXIST) {
+ int rightCid = getRightChild(id, numEntry);
+ if (rightCid != NOT_EXIST) {
+ min = compareTo(rightCid, min) < 0 ? rightCid : min;
+ }
+ }
+ }
+ return min;
+ }
+
+ protected int getMaxChild(int id) {
+ int max = NOT_EXIST;
+ if (id != NOT_EXIST) {
+ max = getLeftChild(id, numEntry);
+ if (max != NOT_EXIST) {
+ int rightCid = getRightChild(id, numEntry);
+ if (rightCid != NOT_EXIST) {
+ max = compareTo(rightCid, max) > 0 ? rightCid : max;
+ }
+ }
+ }
+ return max;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IHeap.java
new file mode 100644
index 0000000..f650f12
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IHeap.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface IHeap<T> {
+ /**
+ * Inserts a new element into the selectionTree
+ *
+ * @param element to be inserted
+ */
+ void insert(T element);
+
+ /**
+ * @return True of the selection tree does not have any element, false
+ * otherwise
+ */
+ boolean isEmpty();
+
+ /**
+ * Removes all the elements in the tree
+ */
+ void reset();
+
+ /**
+ * Return the number of the inserted tuples
+ *
+ * @return
+ */
+ int getNumEntries();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMaxHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMaxHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMaxHeap.java
new file mode 100644
index 0000000..d932a0e
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMaxHeap.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface IMaxHeap<T> extends IHeap<T> {
+ /**
+ * Removes and returns the largest element in the tree.
+ * Make sure the heap is not empty (by {@link #isEmpty()}) before calling this method
+ *
+ * @param result
+ */
+ void getMax(T result);
+
+ /**
+ * Returns (and does NOT remove) the largest element in the tree
+ *
+ * @param result is the object that will eventually contain maximum entry
+ * pointer
+ */
+ void peekMax(T result);
+
+ /**
+ * Removes the current max and insert a new element.
+ * Normally it is a faster way to call getMax() && insert() together
+ *
+ * @param newElement
+ */
+ void replaceMax(T newElement);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinHeap.java
new file mode 100644
index 0000000..784c492
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinHeap.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface IMinHeap<T> extends IHeap<T> {
+ /**
+ * Removes and returns the smallest element in the tree.
+ * Make sure the heap is not empty (by {@link #isEmpty()}) before calling this method
+ *
+ * @param result
+ */
+ void getMin(T result);
+
+ /**
+ * Returns (and does NOT remove) the smallest element in the tree
+ *
+ * @param result is the object that will eventually contain minimum entry
+ * pointer
+ */
+ void peekMin(T result);
+
+ /**
+ * Removes the current min and insert a new element.
+ * Normally it is a faster way to call getMin() && insert() together
+ *
+ * @param newElement
+ */
+ void replaceMin(T newElement);
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinMaxHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinMaxHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinMaxHeap.java
new file mode 100644
index 0000000..8225d88
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/structures/IMinMaxHeap.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.structures;
+
+public interface IMinMaxHeap<T> extends IMinHeap<T>, IMaxHeap<T> {
+}
\ No newline at end of file