You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:23 UTC
[06/14] incubator-asterixdb-hyracks git commit:
VariableSizeFrame(VSizeFrame) support for Hyracks.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
deleted file mode 100644
index d78af12..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-
-/**
- * @author pouria This class implements the run generator for sorting with
- * replacement selection, where there is no limit on the output, i.e.
- * the whole data should be sorted. A SortMinHeap is used as the
- * selectionTree to decide the order of writing tuples into the runs,
- * while memory manager is based on a binary search tree to allocate
- * tuples in the memory. The overall process is as follows: - Read the
- * input data frame by frame. For each tuple T in the current frame: -
- * Try to allocate a memory slot for writing T along with the attached
- * header/footer (for memory management purpose) - If T can not be
- * allocated, try to output as many tuples, currently resident in
- * memory, as needed so that a free slot, large enough to hold T, gets
- * created. MinHeap decides about which tuple should be sent to the
- * output at each step. - Write T into the memory - Calculate the runID
- * of T (based on the last output tuple for the current run). It is
- * either the current run or the next run. Also calculate Poorman's
- * Normalized Key (PNK) for T, to make comparisons faster later. -
- * Create a heap element for T, containing: its runID, the slot pointer
- * to its memory location, and its PNK. - Insert the created heap
- * element into the heap - Upon closing, write all the tuples, currently
- * resident in memory, into their corresponding run(s). Again min heap
- * decides about which tuple is the next for output.
- * OptimizedSortOperatorDescriptor will merge the generated runs, to
- * generate the final sorted output of the data.
- */
-public class OptimizedExternalSortRunGenerator implements IRunGenerator {
- private final IHyracksTaskContext ctx;
- private final int[] sortFields;
- private final INormalizedKeyComputer nkc;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDescriptor;
- private final List<IFrameReader> runs;
-
- private ISelectionTree sTree;
- private IMemoryManager memMgr;
-
- private final int memSize;
- private FrameTupleAccessor inputAccessor; // Used to read tuples in
- // nextFrame()
- private FrameTupleAppender outputAppender; // Used to write tuple to the
- // dedicated output buffer
- private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
- // into run(s)
- private FrameTupleAccessor lastRecordAccessor; // Used to read last output
- // record from the output
- // buffer
- private int lastTupleIx; // Holds index of last output tuple in the
- // dedicated output buffer
- private Slot allocationPtr; // Contains the ptr to the allocated memory slot
- // by the memory manager for the new tuple
- private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
- // the selectionTree to output
- private int[] sTreeTop;
-
- private RunFileWriter writer;
-
- private boolean newRun;
- private int curRunId;
-
- public OptimizedExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDesc, int memSize) {
- this.ctx = ctx;
- this.sortFields = sortFields;
- nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
- this.comparatorFactories = comparatorFactories;
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- this.recordDescriptor = recordDesc;
- this.runs = new LinkedList<IFrameReader>();
- this.memSize = memSize;
- }
-
- @Override
- public void open() throws HyracksDataException {
- runs.clear();
- inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputBuffer = ctx.allocateFrame();
- outputAppender.reset(outputBuffer, true);
- lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-
- this.memMgr = new BSTMemMgr(ctx, memSize);
- this.sTree = new SortMinHeap(ctx, sortFields, comparatorFactories, recordDescriptor, memMgr);
- this.allocationPtr = new Slot();
- this.outputedTuple = new Slot();
- this.sTreeTop = new int[] { -1, -1, -1, -1 };
- curRunId = -1;
- openNewRun();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- inputAccessor.reset(buffer);
- byte[] bufferArray = buffer.array();
- int tupleCount = inputAccessor.getTupleCount();
- for (int i = 0; i < tupleCount; ++i) {
- allocationPtr.clear();
- int tLength = inputAccessor.getTupleEndOffset(i) - inputAccessor.getTupleStartOffset(i);
- memMgr.allocate(tLength, allocationPtr);
- while (allocationPtr.isNull()) {
- int unAllocSize = -1;
- while (unAllocSize < tLength) {
- unAllocSize = outputRecord();
- if (unAllocSize < 1) {
- throw new HyracksDataException(
- "Unable to allocate space for the new tuple, while there is no more tuple to output");
- }
- }
- memMgr.allocate(tLength, allocationPtr);
- }
- memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
- int runId = getRunId(inputAccessor, i);
- int pnk = getPNK(inputAccessor, i, bufferArray);
- int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
- sTree.insert(entry);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
-
- @Override
- public void close() throws HyracksDataException {
- while (!sTree.isEmpty()) { // Outputting remaining elements in the
- // selectionTree
- outputRecord();
- }
- if (outputAppender.getTupleCount() > 0) { // Writing out very last
- // resident records to file
- FrameUtils.flushFrame(outputBuffer, writer);
- }
- outputAppender.reset(outputBuffer, true);
- writer.close();
- runs.add(writer.createReader());
- memMgr.close();
- }
-
- public List<IFrameReader> getRuns() {
- return runs;
- }
-
- private int outputRecord() throws HyracksDataException {
- outputedTuple.clear();
- sTree.getMin(sTreeTop);
- if (!isEntryValid(sTreeTop)) {
- throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
- }
-
- if (sTreeTop[SortMinHeap.RUN_ID_IX] != curRunId) { // We need to switch
- // runs
- openNewRun();
- }
-
- int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
- int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
- // append to
- // the
- // tupleAppender
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
- throw new HyracksDataException("Can not append to the ouput buffer in sort");
- }
- lastTupleIx = 0;
- } else {
- lastTupleIx++;
- }
- outputedTuple.set(tFrameIx, tOffset);
- newRun = false;
- return memMgr.unallocate(outputedTuple);
-
- }
-
- private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) {
- // Moved buffInArray out for better performance (not converting for each and every tuple)
- int sfIdx = sortFields[0];
- int tStart = fta.getTupleStartOffset(tIx);
- int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
- int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
- int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
- return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel - f0StartRel));
- }
-
- private int getRunId(FrameTupleAccessor fta, int tupIx) throws HyracksDataException {
- // Comparing current record to last output record, it decides about current record's runId
- if (newRun) { // Very first record for a new run
- return curRunId;
- }
-
- byte[] lastRecBuff = outputBuffer.array();
- lastRecordAccessor.reset(outputBuffer);
- int lastStartOffset = lastRecordAccessor.getTupleStartOffset(lastTupleIx);
-
- ByteBuffer fr2 = fta.getBuffer();
- byte[] curRecBuff = fr2.array();
- int r2StartOffset = fta.getTupleStartOffset(tupIx);
-
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset + (fIdx - 1) * 4);
- int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
- int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength() + f1Start;
- int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
- int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
- int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
- int l2 = f2End - f2Start;
- int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2, l2);
- if (c != 0) {
- if (c <= 0) {
- return curRunId;
- } else {
- return (curRunId + 1);
- }
- }
- }
- return curRunId;
- }
-
- private void openNewRun() throws HyracksDataException {
- if (writer != null) { // There is a prev run, so flush its tuples and
- // close it first
- if (outputAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
- outputAppender.reset(outputBuffer, true);
- writer.close();
- runs.add(writer.createReader());
- }
-
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- ExternalSortRunGenerator.class.getSimpleName());
- writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- curRunId++;
- newRun = true;
- lastTupleIx = -1;
- }
-
- private boolean isEntryValid(int[] entry) {
- return ((entry[SortMinHeap.RUN_ID_IX] > -1) && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
deleted file mode 100644
index 5b01fb8..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-
-/**
- * @author pouria
- * This class implements the run generator for sorting with replacement
- * selection, where there is a limit on the output, i.e. we are looking
- * for top-k tuples (first k smallest tuples w.r.t sorting keys).
- * A SortMinMaxHeap is used as the selectionTree to decide the order of
- * writing tuples into the runs, and also to prune tuples (if possible).
- * Memory manager is based on a binary search tree and is used to
- * allocate memory slots for tuples.
- * The overall process is as follows (Assuming that the limit is K):
- * - Read the input data frame by frame. For each tuple T in the current
- * frame:
- * - If currentRun R has reached the limit of K on the size, and (T >
- * maximum tuple of R), then ignore T.
- * - Otherwise, try to allocate a memory slot for writing T along with
- * the attached header/footer (for memory management purpose)
- * - If T can not be allocated, try to output as many tuples, currently
- * resident in memory, as needed so that a free slot, large enough to
- * hold T, gets created. MinMaxHeap decides about which tuple should be
- * sent to the output at each step.
- * - Write T into memory.
- * - Calculate the runID of T (based on the last output tuple for the
- * current run). It is either the current run or the next run. Also
- * calculate Poorman's Normalized Key (PNK) for T, to make comparisons
- * faster later.
- * - Create an heap element for T, containing its runID, the slot ptr to
- * its memory location, and its PNK.
- * - If runID is the nextRun, insert the heap element into the heap, and
- * increment the size of nextRun.
- * - If runID is the currentRun, then:
- * - If currentRun has not hit the limit of k, insert the element into
- * the heap, and increase currentRun size. - Otherwise, currentRun has
- * hit the limit of K, while T is less than the max. So discard the
- * current max for the current run (by poping it from the heap and
- * unallocating its memory location) and insert the heap element into
- * the heap. No need to change the currentRun size as we are replacing
- * an old element (the old max) with T.
- * - Upon closing, write all the tuples, currently resident in memory,
- * into their corresponding run(s).
- * - Note that upon opening a new Run R, if size of R (based on stats)
- * is S and (S > K), then (S-K) current maximum tuples of R (which are
- * resident in memory) get discarded at the beginning. MinMax heap can
- * be used to find these tuples.
- */
-public class OptimizedExternalSortRunGeneratorWithLimit implements IRunGenerator {
-
- private final IHyracksTaskContext ctx;
- private final int[] sortFields;
- private final INormalizedKeyComputer nkc;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDescriptor;
- private final List<IFrameReader> runs;
-
- private ISelectionTree sTree;
- private IMemoryManager memMgr;
-
- private final int memSize;
- private FrameTupleAccessor inputAccessor; // Used to read tuples in
- // nextFrame()
- private FrameTupleAppender outputAppender; // Used to write tuple to the
- // dedicated output buffer
- private ByteBuffer outputBuffer; // Dedicated output buffer to write tuples
- // into run(s)
- private FrameTupleAccessor lastRecordAccessor; // Used to read last output
- // record from the output
- // buffer
- private FrameTupleAccessor fta2; // Used to read max record
- private final int outputLimit;
- private int curRunSize;
- private int nextRunSize;
- private int lastTupleIx; // Holds index of last output tuple in the
- // dedicated output buffer
- private Slot allocationPtr; // Contains the ptr to the allocated memory slot
- // by the memory manager for the new tuple
- private Slot outputedTuple; // Contains the ptr to the next tuple chosen by
- // the selectionTree to output
- private Slot discard;
- private int[] sTreeTop;
- private int[] peek;
- private RunFileWriter writer;
- private boolean newRun;
- private int curRunId;
-
- public OptimizedExternalSortRunGeneratorWithLimit(IHyracksTaskContext ctx, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDesc, int memSize, int limit) {
-
- this.ctx = ctx;
- this.sortFields = sortFields;
- nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
- this.comparatorFactories = comparatorFactories;
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- this.recordDescriptor = recordDesc;
- this.runs = new LinkedList<IFrameReader>();
- this.memSize = memSize;
-
- this.outputLimit = limit;
- }
-
- @Override
- public void open() throws HyracksDataException {
- runs.clear();
- inputAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputBuffer = ctx.allocateFrame();
- outputAppender.reset(outputBuffer, true);
- lastRecordAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- this.memMgr = new BSTMemMgr(ctx, memSize);
- this.sTree = new SortMinMaxHeap(ctx, sortFields, comparatorFactories, recordDescriptor, memMgr);
- this.allocationPtr = new Slot();
- this.outputedTuple = new Slot();
- this.sTreeTop = new int[] { -1, -1, -1, -1 };
- this.peek = new int[] { -1, -1, -1, -1 };
- this.discard = new Slot();
-
- curRunId = -1;
- curRunSize = 0;
- nextRunSize = 0;
- openNewRun();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- inputAccessor.reset(buffer);
- byte[] bufferArray = buffer.array();
- int tupleCount = inputAccessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- if (curRunSize >= outputLimit) {
- sTree.peekMax(peek);
- if (isEntryValid(peek)
- && compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX],
- peek[SortMinMaxHeap.OFFSET_IX]) >= 0) {
- continue;
- }
- }
-
- allocationPtr.clear();
- int tLength = inputAccessor.getTupleEndOffset(i) - inputAccessor.getTupleStartOffset(i);
- memMgr.allocate(tLength, allocationPtr);
- while (allocationPtr.isNull()) {
- int unAllocSize = -1;
- while (unAllocSize < tLength) {
- unAllocSize = outputRecord();
- if (unAllocSize < 1) {
- throw new HyracksDataException(
- "Unable to allocate space for the new tuple, while there is no more tuple to output");
- }
- }
- memMgr.allocate(tLength, allocationPtr);
- }
-
- int pnk = getPNK(inputAccessor, i, bufferArray);
- int runId = getRunId(inputAccessor, i);
- if (runId != curRunId) { // tuple belongs to the next run
- memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
- int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
- sTree.insert(entry);
- nextRunSize++;
- continue;
- }
- // belongs to the current run
- if (curRunSize < outputLimit) {
- memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
- int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
- sTree.insert(entry);
- curRunSize++;
- continue;
- }
-
- sTree.peekMax(peek);
- if (compareRecords(inputAccessor, i, peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]) > 0) {
- continue;
- }
- // replacing the max
- sTree.getMax(peek);
- discard.set(peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]);
- memMgr.unallocate(discard);
- memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
- int[] entry = new int[] { runId, allocationPtr.getFrameIx(), allocationPtr.getOffset(), pnk };
- sTree.insert(entry);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
-
- @Override
- public void close() throws HyracksDataException {
- while (!sTree.isEmpty()) { // Outputting remaining elements in the
- // selectionTree
- outputRecordForClose();
- }
-
- if (outputAppender.getTupleCount() > 0) { // Writing out very last
- // resident records to file
- FrameUtils.flushFrame(outputBuffer, writer);
- }
-
- writer.close();
- runs.add(writer.createReader());
- memMgr.close();
- }
-
- public List<IFrameReader> getRuns() {
- return runs;
- }
-
- private int outputRecord() throws HyracksDataException {
- outputedTuple.clear();
- sTree.getMin(sTreeTop);
- if (!isEntryValid(sTreeTop)) {
- throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
- }
- int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
- int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
- if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] == curRunId) {
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can
- // not
- // append
- // to
- // the
- // tupleAppender
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
- throw new HyracksDataException("Can not append to the ouput buffer in sort");
- }
- lastTupleIx = 0;
- } else {
- lastTupleIx++;
- }
- outputedTuple.set(tFrameIx, tOffset);
- newRun = false;
- return memMgr.unallocate(outputedTuple);
- }
- // Minimum belongs to the next Run
- openNewRun();
- int popCount = curRunSize - outputLimit;
- int l = 0;
- int maxFreedSpace = 0;
- for (int p = 0; p < popCount; p++) {
- sTree.getMax(peek);
- if (!isEntryValid(peek)) {
- throw new HyracksDataException("Invalid Maximum extracted from MinMaxHeap");
- }
- discard.set(peek[SortMinMaxHeap.FRAME_IX], peek[SortMinMaxHeap.OFFSET_IX]);
- l = memMgr.unallocate(discard);
- if (l > maxFreedSpace) {
- maxFreedSpace = l;
- }
- curRunSize--;
- }
-
- if (maxFreedSpace != 0) {
- return maxFreedSpace;
- }
- // No max discarded (We just flushed out the prev run, so the output
- // buffer should be clear)
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
- // append to
- // the
- // tupleAppender
- throw new HyracksDataException("Can not append to the ouput buffer in sort");
- }
- lastTupleIx = 0;
- outputedTuple.set(tFrameIx, tOffset);
- newRun = false;
- return memMgr.unallocate(outputedTuple);
- }
-
- private void outputRecordForClose() throws HyracksDataException {
- sTree.getMin(sTreeTop);
- if (!isEntryValid(sTreeTop)) {
- throw new HyracksDataException("Invalid outputed tuple (Top of the selection tree is invalid)");
- }
- int tFrameIx = sTreeTop[SortMinHeap.FRAME_IX];
- int tOffset = sTreeTop[SortMinHeap.OFFSET_IX];
- if (sTreeTop[SortMinMaxHeap.RUN_ID_IX] != curRunId) {
- openNewRun();
- }
-
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) { // Can not
- // append to
- // the
- // tupleAppender
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!memMgr.readTuple(tFrameIx, tOffset, outputAppender)) {
- throw new HyracksDataException("Can not append to the ouput buffer in sort");
- }
- }
- }
-
- private int getPNK(FrameTupleAccessor fta, int tIx, byte[] buffInArray) {
- // Moved buffInArray out for better performance (not converting for each and every tuple)
- int sfIdx = sortFields[0];
- int tStart = fta.getTupleStartOffset(tIx);
- int f0StartRel = fta.getFieldStartOffset(tIx, sfIdx);
- int f0EndRel = fta.getFieldEndOffset(tIx, sfIdx);
- int f0Start = f0StartRel + tStart + fta.getFieldSlotsLength();
- return (nkc == null ? 0 : nkc.normalize(buffInArray, f0Start, f0EndRel - f0StartRel));
- }
-
- private int getRunId(FrameTupleAccessor fta, int tupIx) throws HyracksDataException {
- // Comparing current record to last output record, it decides about current record's runId
- if (newRun) { // Very first record for a new run
- return curRunId;
- }
-
- byte[] lastRecBuff = outputBuffer.array();
- lastRecordAccessor.reset(outputBuffer);
- int lastStartOffset = lastRecordAccessor.getTupleStartOffset(lastTupleIx);
-
- ByteBuffer fr2 = fta.getBuffer();
- byte[] curRecBuff = fr2.array();
- int r2StartOffset = fta.getTupleStartOffset(tupIx);
-
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : outputBuffer.getInt(lastStartOffset + (fIdx - 1) * 4);
- int f1End = outputBuffer.getInt(lastStartOffset + fIdx * 4);
- int s1 = lastStartOffset + lastRecordAccessor.getFieldSlotsLength() + f1Start;
- int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
- int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
- int s2 = r2StartOffset + fta.getFieldSlotsLength() + f2Start;
- int l2 = f2End - f2Start;
- int c = comparators[f].compare(lastRecBuff, s1, l1, curRecBuff, s2, l2);
- if (c != 0) {
- if (c <= 0) {
- return curRunId;
- } else {
- return (curRunId + 1);
- }
- }
- }
- return curRunId;
- }
-
- // first<sec : -1
- private int compareRecords(FrameTupleAccessor fta1, int ix1, int fix2, int offset2) throws HyracksDataException {
- ByteBuffer buff1 = fta1.getBuffer();
- byte[] recBuff1 = buff1.array();
- int offset1 = fta1.getTupleStartOffset(ix1);
-
- offset2 += BSTNodeUtil.HEADER_SIZE;
- ByteBuffer buff2 = memMgr.getFrame(fix2);
- fta2.reset(buff2);
- byte[] recBuff2 = buff2.array();
-
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : buff1.getInt(offset1 + (fIdx - 1) * 4);
- int f1End = buff1.getInt(offset1 + fIdx * 4);
- int s1 = offset1 + fta1.getFieldSlotsLength() + f1Start;
- int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : buff2.getInt(offset2 + (fIdx - 1) * 4);
- int f2End = buff2.getInt(offset2 + fIdx * 4);
- int s2 = offset2 + fta2.getFieldSlotsLength() + f2Start;
- int l2 = f2End - f2Start;
- int c = comparators[f].compare(recBuff1, s1, l1, recBuff2, s2, l2);
-
- if (c != 0) {
- return c;
- }
- }
- return 0;
-
- }
-
- private void openNewRun() throws HyracksDataException {
- if (writer != null) { // There is a prev run, so flush its tuples and
- // close it first
- if (outputAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
- outputAppender.reset(outputBuffer, true);
- writer.close();
- runs.add(writer.createReader());
- }
-
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- ExternalSortRunGenerator.class.getSimpleName());
- writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- curRunId++;
- newRun = true;
- curRunSize = nextRunSize;
- nextRunSize = 0;
- lastTupleIx = -1;
- }
-
- private boolean isEntryValid(int[] entry) {
- return ((entry[SortMinHeap.RUN_ID_IX] > -1) && (entry[SortMinHeap.FRAME_IX] > -1) && (entry[SortMinHeap.OFFSET_IX] > -1));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunAndMaxFrameSizePair.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunAndMaxFrameSizePair.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunAndMaxFrameSizePair.java
new file mode 100644
index 0000000..c68f1e7
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunAndMaxFrameSizePair.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+
+public class RunAndMaxFrameSizePair {
+ public IFrameReader run;
+ public int maxFrameSize;
+
+ public RunAndMaxFrameSizePair(IFrameReader run, int maxFrameSize) {
+ this.run = run;
+ this.maxFrameSize = maxFrameSize;
+ }
+
+ void updateSize(int newMaxSize){
+ this.maxFrameSize = newMaxSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index cf0d0ad..f013594 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -1,23 +1,23 @@
/*
* Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.sort;
-import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -25,52 +25,61 @@ import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.sort.util.GroupFrameAccessor;
import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
public class RunMergingFrameReader implements IFrameReader {
private final IHyracksTaskContext ctx;
- private final IFrameReader[] runCursors;
- private final List<ByteBuffer> inFrames;
+ private final List<? extends IFrameReader> runCursors;
+ private final List<? extends IFrame> inFrames;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
private final INormalizedKeyComputer nmkComputer;
private final RecordDescriptor recordDesc;
- private final FrameTupleAppender outFrameAppender;
+ private final int topK;
+ private int tupleCount;
+ private FrameTupleAppender outFrameAppender;
private ReferencedPriorityQueue topTuples;
private int[] tupleIndexes;
- private FrameTupleAccessor[] tupleAccessors;
+ private IFrameTupleAccessor[] tupleAccessors;
- public RunMergingFrameReader(IHyracksTaskContext ctx, IFrameReader[] runCursors, List<ByteBuffer> inFrames,
- int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
- RecordDescriptor recordDesc) {
+ public RunMergingFrameReader(IHyracksTaskContext ctx, List<? extends IFrameReader> runs,
+ List<? extends IFrame> inFrames, int[] sortFields, IBinaryComparator[] comparators,
+ INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc) {
+ this(ctx, runs, inFrames, sortFields, comparators, nmkComputer, recordDesc, Integer.MAX_VALUE);
+ }
+
+ public RunMergingFrameReader(IHyracksTaskContext ctx, List<? extends IFrameReader> runs,
+ List<? extends IFrame> inFrames, int[] sortFields, IBinaryComparator[] comparators,
+ INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc, int topK) {
this.ctx = ctx;
- this.runCursors = runCursors;
+ this.runCursors = runs;
this.inFrames = inFrames;
this.sortFields = sortFields;
this.comparators = comparators;
this.nmkComputer = nmkComputer;
this.recordDesc = recordDesc;
- outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ this.topK = topK;
}
@Override
public void open() throws HyracksDataException {
- tupleAccessors = new FrameTupleAccessor[runCursors.length];
+ tupleCount = 0;
+ tupleAccessors = new IFrameTupleAccessor[runCursors.size()];
+ outFrameAppender = new FrameTupleAppender();
Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), recordDesc, runCursors.length, comparator,
- sortFields, nmkComputer);
- tupleIndexes = new int[runCursors.length];
- for (int i = 0; i < runCursors.length; i++) {
+ topTuples = new ReferencedPriorityQueue(runCursors.size(), comparator, sortFields, nmkComputer);
+ tupleIndexes = new int[runCursors.size()];
+ for (int i = 0; i < runCursors.size(); i++) {
tupleIndexes[i] = 0;
int runIndex = topTuples.peek().getRunid();
- runCursors[runIndex].open();
- if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
- tupleAccessors[runIndex] = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
- tupleAccessors[runIndex].reset(inFrames.get(runIndex));
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ runCursors.get(runIndex).open();
+ if (runCursors.get(runIndex).nextFrame(inFrames.get(runIndex))) {
+ tupleAccessors[runIndex] = new GroupFrameAccessor(ctx.getInitialFrameSize(), recordDesc);
+ tupleAccessors[runIndex].reset(inFrames.get(runIndex).getBuffer());
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, inFrames, tupleAccessors, topTuples);
} else {
closeRun(runIndex, runCursors, tupleAccessors);
topTuples.pop();
@@ -79,20 +88,21 @@ public class RunMergingFrameReader implements IFrameReader {
}
@Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- outFrameAppender.reset(buffer, true);
- while (!topTuples.areRunsExhausted()) {
+ public boolean nextFrame(IFrame outFrame) throws HyracksDataException {
+ outFrameAppender.reset(outFrame, true);
+ while (!topTuples.areRunsExhausted() && tupleCount < topK) {
ReferenceEntry top = topTuples.peek();
int runIndex = top.getRunid();
- FrameTupleAccessor fta = top.getAccessor();
+ IFrameTupleAccessor fta = top.getAccessor();
int tupleIndex = top.getTupleIndex();
if (!outFrameAppender.append(fta, tupleIndex)) {
return true;
+ } else {
+ tupleCount++;
}
-
++tupleIndexes[runIndex];
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, inFrames, tupleAccessors, topTuples);
}
if (outFrameAppender.getTupleCount() > 0) {
@@ -103,14 +113,15 @@ public class RunMergingFrameReader implements IFrameReader {
@Override
public void close() throws HyracksDataException {
- for (int i = 0; i < runCursors.length; ++i) {
+ for (int i = 0; i < runCursors.size(); ++i) {
closeRun(i, runCursors, tupleAccessors);
}
}
- private void setNextTopTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
- boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+ private static void setNextTopTuple(int runIndex, int[] tupleIndexes, List<? extends IFrameReader> runCursors,
+ List<? extends IFrame> inFrames, IFrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
+ throws HyracksDataException {
+ boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, inFrames, tupleAccessors);
if (exists) {
topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
} else {
@@ -119,15 +130,16 @@ public class RunMergingFrameReader implements IFrameReader {
}
}
- private boolean hasNextTuple(int runIndex, int[] tupleIndexes, IFrameReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
- if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
+ private static boolean hasNextTuple(int runIndex, int[] tupleIndexes, List<? extends IFrameReader> runCursors,
+ List<? extends IFrame> inFrames, IFrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
+ if (tupleAccessors[runIndex] == null || runCursors.get(runIndex) == null) {
return false;
} else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
- ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
- if (runCursors[runIndex].nextFrame(buf)) {
+ IFrame frame = inFrames.get(runIndex);
+ if (runCursors.get(runIndex).nextFrame(frame)) {
tupleIndexes[runIndex] = 0;
- return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+ tupleAccessors[runIndex].reset(frame.getBuffer());
+ return hasNextTuple(runIndex, tupleIndexes, runCursors, inFrames, tupleAccessors);
} else {
return false;
}
@@ -136,11 +148,12 @@ public class RunMergingFrameReader implements IFrameReader {
}
}
- private void closeRun(int index, IFrameReader[] runCursors, IFrameTupleAccessor[] tupleAccessors)
+ private static void closeRun(int index, List<? extends IFrameReader> runCursors,
+ IFrameTupleAccessor[] tupleAccessors)
throws HyracksDataException {
- if (runCursors[index] != null) {
- runCursors[index].close();
- runCursors[index] = null;
+ if (runCursors.get(index) != null) {
+ runCursors.get(index).close();
+ runCursors.set(index, null);
tupleAccessors[index] = null;
}
}
@@ -153,8 +166,8 @@ public class RunMergingFrameReader implements IFrameReader {
if (nmk1 != nmk2) {
return ((((long) nmk1) & 0xffffffffL) < (((long) nmk2) & 0xffffffffL)) ? -1 : 1;
}
- FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
+ IFrameTupleAccessor fta1 = tp1.getAccessor();
+ IFrameTupleAccessor fta2 = tp2.getAccessor();
byte[] b1 = fta1.getBuffer().array();
byte[] b2 = fta2.getBuffer().array();
int[] tPointers1 = tp1.getTPointers();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
deleted file mode 100644
index 73f99dd..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-/**
- * @author pouria
- * Defines a slot in the memory, which can be a free or used (allocated)
- * slot. Memory is a set of frames, ordered as a list. Each tuple is
- * stored in a slot, where the location of the slot is denoted by a pair
- * of integers:
- * - The index of the frame, in the list of frames in memory. (referred
- * to as frameIx)
- * - The starting offset of the slot, within that specific frame.
- * (referred to as offset)
- */
-public class Slot {
-
- private int frameIx;
- private int offset;
-
- public Slot() {
- this.frameIx = BSTNodeUtil.INVALID_INDEX;
- this.offset = BSTNodeUtil.INVALID_INDEX;
- }
-
- public Slot(int frameIx, int offset) {
- this.frameIx = frameIx;
- this.offset = offset;
- }
-
- public void set(int frameIx, int offset) {
- this.frameIx = frameIx;
- this.offset = offset;
- }
-
- public int getFrameIx() {
- return frameIx;
- }
-
- public void setFrameIx(int frameIx) {
- this.frameIx = frameIx;
- }
-
- public int getOffset() {
- return offset;
- }
-
- public void setOffset(int offset) {
- this.offset = offset;
- }
-
- public boolean isNull() {
- return (frameIx == BSTNodeUtil.INVALID_INDEX) || (offset == BSTNodeUtil.INVALID_INDEX);
- }
-
- public void clear() {
- this.frameIx = BSTNodeUtil.INVALID_INDEX;
- this.offset = BSTNodeUtil.INVALID_INDEX;
- }
-
- public void copy(Slot s) {
- this.frameIx = s.getFrameIx();
- this.offset = s.getOffset();
- }
-
- public String toString() {
- return "(" + frameIx + ", " + offset + ")";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
deleted file mode 100644
index 1cde75f..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * @author pouria
- * Implements a minimum binary heap, used as selection tree, for sort
- * with replacement. This heap structure can only be used as the min
- * heap (no access to the max element). Elements in the heap are
- * compared based on their run numbers, and sorting key(s):
- * Considering two heap elements A and B:
- * if RunNumber(A) > RunNumber(B) then A is larger than B if
- * RunNumber(A) == RunNumber(B), then A is smaller than B, if and only
- * if the value of the sort key(s) in B is greater than A (based on the
- * sort comparator).
- */
-public class SortMinHeap implements ISelectionTree {
-
- static final int RUN_ID_IX = 0;
- static final int FRAME_IX = 1;
- static final int OFFSET_IX = 2;
- private static final int PNK_IX = 3;
- private static final int ELEMENT_SIZE = 4;
- private static final int INIT_ARRAY_SIZE = 512;
-
- private final int[] sortFields;
- private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDescriptor;
- private final FrameTupleAccessor fta1;
- private final FrameTupleAccessor fta2;
- private int[] elements;
- private int nextIx;
- private final IMemoryManager memMgr;
- private int[] top; // Used as a temp variable to access the top, to avoid object creation
-
- public SortMinHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDesc, IMemoryManager memMgr) {
- this.sortFields = sortFields;
- this.comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- this.comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- this.recordDescriptor = recordDesc;
- fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- this.memMgr = memMgr;
- this.top = new int[ELEMENT_SIZE];
- Arrays.fill(top, -1);
- this.elements = new int[INIT_ARRAY_SIZE];
- Arrays.fill(elements, -1);
- this.nextIx = 0;
- }
-
- /*
- * Assumption (element structure): [RunId][FrameIx][Offset][Poorman NK]
- */
- @Override
- public void getMin(int[] result) throws HyracksDataException {
- if (nextIx == 0) {
- result[0] = result[1] = result[2] = result[3] = -1;
- return;
- }
-
- top = delete(0);
- for (int i = 0; i < top.length; i++) {
- result[i] = top[i];
- }
- }
-
- @Override
- public void peekMin(int[] result) {
- if (nextIx == 0) {
- result[0] = result[1] = result[2] = result[3] = -1;
- return;
- }
- for (int i = 0; i < ELEMENT_SIZE; i++) {
- result[i] = elements[i];
- }
- }
-
- @Override
- public void insert(int[] e) throws HyracksDataException {
- if (nextIx >= elements.length) {
- elements = Arrays.copyOf(elements, elements.length * 2);
- }
- for (int i = 0; i < ELEMENT_SIZE; i++) {
- elements[nextIx + i] = e[i];
- }
- siftUp(nextIx);
- nextIx += ELEMENT_SIZE;
-
- }
-
- @Override
- public void reset() {
- Arrays.fill(elements, -1);
- nextIx = 0;
- }
-
- @Override
- public boolean isEmpty() {
- return (nextIx < ELEMENT_SIZE);
- }
-
- public int _debugGetSize() {
- return (nextIx > 0 ? (nextIx - 1) / 4 : 0);
- }
-
- private int[] delete(int nix) throws HyracksDataException {
- int[] nv = Arrays.copyOfRange(elements, nix, nix + ELEMENT_SIZE);
- int[] lastElem = removeLast();
-
- if (nextIx == 0) {
- return nv;
- }
-
- for (int i = 0; i < ELEMENT_SIZE; i++) {
- elements[nix + i] = lastElem[i];
- }
- int pIx = getParent(nix);
- if (pIx > -1 && (compare(lastElem, Arrays.copyOfRange(elements, pIx, pIx + ELEMENT_SIZE)) < 0)) {
- siftUp(nix);
- } else {
- siftDown(nix);
- }
- return nv;
- }
-
- private int[] removeLast() {
- if (nextIx < ELEMENT_SIZE) { //this is the very last element
- return new int[] { -1, -1, -1, -1 };
- }
- int[] l = Arrays.copyOfRange(elements, nextIx - ELEMENT_SIZE, nextIx);
- Arrays.fill(elements, nextIx - ELEMENT_SIZE, nextIx, -1);
- nextIx -= ELEMENT_SIZE;
- return l;
- }
-
- private void siftUp(int nodeIx) throws HyracksDataException {
- int p = getParent(nodeIx);
- if (p < 0) {
- return;
- }
- while (p > -1 && (compare(nodeIx, p) < 0)) {
- swap(p, nodeIx);
- nodeIx = p;
- p = getParent(nodeIx);
- if (p < 0) { // We are at the root
- return;
- }
- }
- }
-
- private void siftDown(int nodeIx) throws HyracksDataException {
- int mix = getMinOfChildren(nodeIx);
- if (mix < 0) {
- return;
- }
- while (mix > -1 && (compare(mix, nodeIx) < 0)) {
- swap(mix, nodeIx);
- nodeIx = mix;
- mix = getMinOfChildren(nodeIx);
- if (mix < 0) { // We hit the leaf level
- return;
- }
- }
- }
-
- // first < sec : -1
- private int compare(int nodeSIx1, int nodeSIx2) throws HyracksDataException {
- int[] n1 = Arrays.copyOfRange(elements, nodeSIx1, nodeSIx1 + ELEMENT_SIZE);
- int[] n2 = Arrays.copyOfRange(elements, nodeSIx2, nodeSIx2 + ELEMENT_SIZE);
- return (compare(n1, n2));
- }
-
- // first < sec : -1
- private int compare(int[] n1, int[] n2) throws HyracksDataException {
- // Compare Run Numbers
- if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
- return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
- }
-
- // Compare Poor man Normalized Keys
- if (n1[PNK_IX] != n2[PNK_IX]) {
- return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1 : 1;
- }
-
- return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]), n1[OFFSET_IX], n2[OFFSET_IX]);
- }
-
- private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset, int r2StartOffset)
- throws HyracksDataException {
- byte[] b1 = fr1.array();
- byte[] b2 = fr2.array();
- fta1.reset(fr1);
- fta2.reset(fr2);
- int headerLen = BSTNodeUtil.HEADER_SIZE;
- r1StartOffset += headerLen;
- r2StartOffset += headerLen;
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1) * 4);
- int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
- int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
- int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
- int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
- int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
- int l2 = f2End - f2Start;
-
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
-
- private int getMinOfChildren(int nix) throws HyracksDataException { // returns index of min child
- int lix = getLeftChild(nix);
- if (lix < 0) {
- return -1;
- }
- int rix = getRightChild(nix);
- if (rix < 0) {
- return lix;
- }
- return ((compare(lix, rix) < 0) ? lix : rix);
- }
-
- //Assumption: n1Ix and n2Ix are starting indices of two elements
- private void swap(int n1Ix, int n2Ix) {
- int[] temp = Arrays.copyOfRange(elements, n1Ix, n1Ix + ELEMENT_SIZE);
- for (int i = 0; i < ELEMENT_SIZE; i++) {
- elements[n1Ix + i] = elements[n2Ix + i];
- elements[n2Ix + i] = temp[i];
- }
- }
-
- private int getLeftChild(int ix) {
- int lix = (2 * ELEMENT_SIZE) * (ix / ELEMENT_SIZE) + ELEMENT_SIZE;
- return ((lix < nextIx) ? lix : -1);
- }
-
- private int getRightChild(int ix) {
- int rix = (2 * ELEMENT_SIZE) * (ix / ELEMENT_SIZE) + (2 * ELEMENT_SIZE);
- return ((rix < nextIx) ? rix : -1);
- }
-
- private int getParent(int ix) {
- if (ix <= 0) {
- return -1;
- }
- return ((ix - ELEMENT_SIZE) / (2 * ELEMENT_SIZE)) * ELEMENT_SIZE;
- }
-
- private ByteBuffer getFrame(int frameIx) {
- return (memMgr.getFrame(frameIx));
- }
-
- @Override
- public void getMax(int[] result) {
- throw new IllegalStateException("getMax() method not applicable to Min Heap");
- }
-
- @Override
- public void peekMax(int[] result) {
- throw new IllegalStateException("getMax() method not applicable to Min Heap");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
deleted file mode 100644
index 12aa8a1..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * @author pouria
- * Implements a MinMax binary heap, used as the selection tree, in
- * sorting with replacement. Check SortMinHeap for details on comparing
- * elements.
- */
-public class SortMinMaxHeap implements ISelectionTree {
- static final int RUN_ID_IX = 0;
- static final int FRAME_IX = 1;
- static final int OFFSET_IX = 2;
- private static final int PNK_IX = 3;
- private static final int NOT_EXIST = -1;
- private static final int ELEMENT_SIZE = 4;
- private static final int INIT_ARRAY_SIZE = 512;
-
- private final int[] sortFields;
- private final IBinaryComparator[] comparators;
- private final RecordDescriptor recordDescriptor;
- private final FrameTupleAccessor fta1;
- private final FrameTupleAccessor fta2;
-
- private int[] elements;
- private int nextIx;
-
- private final IMemoryManager memMgr;
-
- public SortMinMaxHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDesc, IMemoryManager memMgr) {
- this.sortFields = sortFields;
- this.comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- this.comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- this.recordDescriptor = recordDesc;
- fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- this.memMgr = memMgr;
- this.elements = new int[INIT_ARRAY_SIZE];
- Arrays.fill(elements, -1);
- this.nextIx = 0;
- }
-
- @Override
- public void insert(int[] element) throws HyracksDataException {
- if (nextIx >= elements.length) {
- elements = Arrays.copyOf(elements, elements.length * 2);
- }
- for (int i = 0; i < ELEMENT_SIZE; i++) {
- elements[nextIx + i] = element[i];
- }
- nextIx += ELEMENT_SIZE;
- bubbleUp(nextIx - ELEMENT_SIZE);
- }
-
- @Override
- public void getMin(int[] result) throws HyracksDataException {
- if (nextIx == 0) {
- result[0] = result[1] = result[2] = result[3] = -1;
- return;
- }
-
- int[] topElem = delete(0);
- for (int x = 0; x < ELEMENT_SIZE; x++) {
- result[x] = topElem[x];
- }
- }
-
- @Override
- public void reset() {
- Arrays.fill(elements, -1);
- nextIx = 0;
- }
-
- @Override
- public boolean isEmpty() {
- return (nextIx < ELEMENT_SIZE);
- }
-
- @Override
- public void peekMin(int[] result) {
- if (nextIx == 0) {
- result[0] = result[1] = result[2] = result[3] = -1;
- return;
- }
-
- for (int x = 0; x < ELEMENT_SIZE; x++) {
- result[x] = elements[x];
- }
- }
-
- @Override
- public void getMax(int[] result) throws HyracksDataException {
- if (nextIx == ELEMENT_SIZE) {
- int[] topElement = removeLast();
- for (int x = 0; x < ELEMENT_SIZE; x++) {
- result[x] = topElement[x];
- }
- return;
- }
-
- if (nextIx > ELEMENT_SIZE) {
- int lc = getLeftChild(0);
- int rc = getRightChild(0);
- int maxIx = lc;
-
- if (rc != -1) {
- maxIx = compare(lc, rc) < 0 ? rc : lc;
- }
-
- int[] maxElem = delete(maxIx);
- for (int x = 0; x < ELEMENT_SIZE; x++) {
- result[x] = maxElem[x];
- }
- return;
- }
-
- result[0] = result[1] = result[2] = result[3] = -1;
-
- }
-
- @Override
- public void peekMax(int[] result) throws HyracksDataException {
- if (nextIx == ELEMENT_SIZE) {
- for (int i = 0; i < ELEMENT_SIZE; i++) {
- result[i] = elements[i];
- }
- return;
- }
- if (nextIx > ELEMENT_SIZE) {
- int lc = getLeftChild(0);
- int rc = getRightChild(0);
- int maxIx = lc;
-
- if (rc != -1) {
- maxIx = compare(lc, rc) < 0 ? rc : lc;
- }
-
- for (int x = 0; x < ELEMENT_SIZE; x++) {
- result[x] = elements[maxIx + x];
- }
-
- return;
- }
- result[0] = result[1] = result[2] = result[3] = -1;
- }
-
- private int[] delete(int delIx) throws HyracksDataException {
- int s = nextIx;
- if (nextIx > ELEMENT_SIZE) {
- int[] delEntry = Arrays.copyOfRange(elements, delIx, delIx + ELEMENT_SIZE);
- int[] last = removeLast();
- if (delIx != (s - ELEMENT_SIZE)) {
- for (int x = 0; x < ELEMENT_SIZE; x++) {
- elements[delIx + x] = last[x];
- }
- trickleDown(delIx);
- }
- return delEntry;
- } else if (nextIx == ELEMENT_SIZE) {
- return (removeLast());
- }
- return null;
- }
-
- private int[] removeLast() {
- if (nextIx < ELEMENT_SIZE) { //this is the very last element
- return new int[] { -1, -1, -1, -1 };
- }
- int[] l = Arrays.copyOfRange(elements, nextIx - ELEMENT_SIZE, nextIx);
- Arrays.fill(elements, nextIx - ELEMENT_SIZE, nextIx, -1);
- nextIx -= ELEMENT_SIZE;
- return l;
- }
-
- private void bubbleUp(int ix) throws HyracksDataException {
- int p = getParentIx(ix);
- if (isAtMinLevel(ix)) {
- if (p != NOT_EXIST && compare(p, ix) < 0) {
- swap(ix, p);
- bubbleUpMax(p);
- } else {
- bubbleUpMin(ix);
- }
- } else { // i is at max level
- if (p != NOT_EXIST && compare(ix, p) < 0) {
- swap(ix, p);
- bubbleUpMin(p);
- } else {
- bubbleUpMax(ix);
- }
- }
- }
-
- private void bubbleUpMax(int ix) throws HyracksDataException {
- int gp = getGrandParent(ix);
- if (gp != NOT_EXIST && compare(gp, ix) < 0) {
- swap(ix, gp);
- bubbleUpMax(gp);
- }
- }
-
- private void bubbleUpMin(int ix) throws HyracksDataException {
- int gp = getGrandParent(ix);
- if (gp != NOT_EXIST && compare(ix, gp) < 0) {
- swap(ix, gp);
- bubbleUpMin(gp);
- }
- }
-
- private void trickleDown(int ix) throws HyracksDataException {
- if (isAtMinLevel(ix)) {
- trickleDownMin(ix);
- } else {
- trickleDownMax(ix);
- }
- }
-
- private void trickleDownMax(int ix) throws HyracksDataException {
- int maxIx = getMaxOfDescendents(ix);
- if (maxIx == NOT_EXIST) {
- return;
- }
- if (maxIx > getLeftChild(ix) && maxIx > getRightChild(ix)) { // A grand
- // children
- if (compare(ix, maxIx) < 0) {
- swap(maxIx, ix);
- int p = getParentIx(maxIx);
- if (p != NOT_EXIST && compare(maxIx, p) < 0) {
- swap(maxIx, p);
- }
- trickleDownMax(maxIx);
- }
- } else { // A children
- if (compare(ix, maxIx) < 0) {
- swap(ix, maxIx);
- }
- }
- }
-
- private void trickleDownMin(int ix) throws HyracksDataException {
- int minIx = getMinOfDescendents(ix);
- if (minIx == NOT_EXIST) {
- return;
- }
- if (minIx > getLeftChild(ix) && minIx > getRightChild(ix)) { // A grand
- // children
- if (compare(minIx, ix) < 0) {
- swap(minIx, ix);
- int p = getParentIx(minIx);
- if (p != NOT_EXIST && compare(p, minIx) < 0) {
- swap(minIx, p);
- }
- trickleDownMin(minIx);
- }
- } else { // A children
- if (compare(minIx, ix) < 0) {
- swap(ix, minIx);
- }
- }
- }
-
- // Min among children and grand children
- private int getMinOfDescendents(int ix) throws HyracksDataException {
- int lc = getLeftChild(ix);
- if (lc == NOT_EXIST) {
- return NOT_EXIST;
- }
- int rc = getRightChild(ix);
- if (rc == NOT_EXIST) {
- return lc;
- }
- int min = (compare(lc, rc) < 0) ? lc : rc;
- int[] lgc = getLeftGrandChildren(ix);
- int[] rgc = getRightGrandChildren(ix);
- for (int k = 0; k < 2; k++) {
- if (lgc[k] != NOT_EXIST && compare(lgc[k], min) < 0) {
- min = lgc[k];
- }
- if (rgc[k] != NOT_EXIST && compare(rgc[k], min) < 0) {
- min = rgc[k];
- }
- }
- return min;
- }
-
- // Max among children and grand children
- private int getMaxOfDescendents(int ix) throws HyracksDataException {
- int lc = getLeftChild(ix);
- if (lc == NOT_EXIST) {
- return NOT_EXIST;
- }
- int rc = getRightChild(ix);
- if (rc == NOT_EXIST) {
- return lc;
- }
- int max = (compare(lc, rc) < 0) ? rc : lc;
- int[] lgc = getLeftGrandChildren(ix);
- int[] rgc = getRightGrandChildren(ix);
- for (int k = 0; k < 2; k++) {
- if (lgc[k] != NOT_EXIST && compare(max, lgc[k]) < 0) {
- max = lgc[k];
- }
- if (rgc[k] != NOT_EXIST && compare(max, rgc[k]) < 0) {
- max = rgc[k];
- }
- }
- return max;
- }
-
- private void swap(int n1Ix, int n2Ix) {
- int[] temp = Arrays.copyOfRange(elements, n1Ix, n1Ix + ELEMENT_SIZE);
- for (int i = 0; i < ELEMENT_SIZE; i++) {
- elements[n1Ix + i] = elements[n2Ix + i];
- elements[n2Ix + i] = temp[i];
- }
- }
-
- private int getParentIx(int i) {
- if (i < ELEMENT_SIZE) {
- return NOT_EXIST;
- }
- return ((i - ELEMENT_SIZE) / (2 * ELEMENT_SIZE)) * ELEMENT_SIZE;
- }
-
- private int getGrandParent(int i) {
- int p = getParentIx(i);
- return p != -1 ? getParentIx(p) : NOT_EXIST;
- }
-
- private int getLeftChild(int i) {
- int lc = (2 * ELEMENT_SIZE) * (i / ELEMENT_SIZE) + ELEMENT_SIZE;
- return (lc < nextIx ? lc : -1);
- }
-
- private int[] getLeftGrandChildren(int i) {
- int lc = getLeftChild(i);
- return lc != NOT_EXIST ? new int[] { getLeftChild(lc), getRightChild(lc) } : new int[] { NOT_EXIST, NOT_EXIST };
- }
-
- private int getRightChild(int i) {
- int rc = (2 * ELEMENT_SIZE) * (i / ELEMENT_SIZE) + (2 * ELEMENT_SIZE);
- return (rc < nextIx ? rc : -1);
- }
-
- private int[] getRightGrandChildren(int i) {
- int rc = getRightChild(i);
- return rc != NOT_EXIST ? new int[] { getLeftChild(rc), getRightChild(rc) } : new int[] { NOT_EXIST, NOT_EXIST };
- }
-
- private boolean isAtMinLevel(int i) {
- int l = getLevel(i);
- return l % 2 == 0 ? true : false;
- }
-
- private int getLevel(int i) {
- if (i < ELEMENT_SIZE) {
- return 0;
- }
-
- int cnv = i / ELEMENT_SIZE;
- int l = (int) Math.floor(Math.log(cnv) / Math.log(2));
- if (cnv == (((int) Math.pow(2, (l + 1))) - 1)) {
- return (l + 1);
- }
- return l;
- }
-
- private ByteBuffer getFrame(int frameIx) {
- return (memMgr.getFrame(frameIx));
- }
-
- // first < sec : -1
- private int compare(int nodeSIx1, int nodeSIx2) throws HyracksDataException {
- int[] n1 = Arrays.copyOfRange(elements, nodeSIx1, nodeSIx1 + ELEMENT_SIZE); //tree.get(nodeSIx1);
- int[] n2 = Arrays.copyOfRange(elements, nodeSIx2, nodeSIx2 + ELEMENT_SIZE); //tree.get(nodeSIx2);
- return (compare(n1, n2));
- }
-
- // first < sec : -1
- private int compare(int[] n1, int[] n2) throws HyracksDataException {
- // Compare Run Numbers
- if (n1[RUN_ID_IX] != n2[RUN_ID_IX]) {
- return (n1[RUN_ID_IX] < n2[RUN_ID_IX] ? -1 : 1);
- }
-
- // Compare Poor man Normalized Keys
- if (n1[PNK_IX] != n2[PNK_IX]) {
- return ((((long) n1[PNK_IX]) & 0xffffffffL) < (((long) n2[PNK_IX]) & 0xffffffffL)) ? -1 : 1;
- }
-
- return compare(getFrame(n1[FRAME_IX]), getFrame(n2[FRAME_IX]), n1[OFFSET_IX], n2[OFFSET_IX]);
- }
-
- private int compare(ByteBuffer fr1, ByteBuffer fr2, int r1StartOffset, int r2StartOffset)
- throws HyracksDataException {
- byte[] b1 = fr1.array();
- byte[] b2 = fr2.array();
- fta1.reset(fr1);
- fta2.reset(fr2);
- int headerLen = BSTNodeUtil.HEADER_SIZE;
- r1StartOffset += headerLen;
- r2StartOffset += headerLen;
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : fr1.getInt(r1StartOffset + (fIdx - 1) * 4);
- int f1End = fr1.getInt(r1StartOffset + fIdx * 4);
- int s1 = r1StartOffset + fta1.getFieldSlotsLength() + f1Start;
- int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : fr2.getInt(r2StartOffset + (fIdx - 1) * 4);
- int f2End = fr2.getInt(r2StartOffset + fIdx * 4);
- int s2 = r2StartOffset + fta2.getFieldSlotsLength() + f2Start;
- int l2 = f2End - f2Start;
-
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
new file mode 100644
index 0000000..ee43993
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.value.*;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+
+import java.util.List;
+
+public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescriptor {
+
+ private final int topK;
+
+ public TopKSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int topK, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) {
+ super(spec, framesLimit, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+ this.topK = topK;
+ }
+
+ @Override
+ public SortActivity getSortActivity(ActivityId id) {
+ return new SortActivity(id) {
+ @Override
+ protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider) {
+ return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, firstKeyNormalizerFactory,
+ comparatorFactories, recordDescriptors[0]);
+
+ }
+ };
+ }
+
+ @Override
+ public MergeActivity getMergeActivity(ActivityId id) {
+ return new MergeActivity(id) {
+ @Override
+ protected ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter, List<RunAndMaxFrameSizePair> runs, IBinaryComparator[] comparators,
+ INormalizedKeyComputer nmkComputer, int necessaryFrames) {
+ return new ExternalSortRunMerger(ctx, sorter, runs, sortFields, comparators,
+ nmkComputer, recordDescriptors[0], necessaryFrames, topK, writer);
+ }
+ };
+ }
+}