You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:24 UTC
[07/14] incubator-asterixdb-hyracks git commit:
VariableSizeFrame(VSizeFrame) support for Hyracks.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 9178094..6b36480 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -1,318 +1,271 @@
/*
* Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.sort;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.comm.FrameConstants;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
-
-/**
- * @author pouria This class defines the logic for merging the run, generated
- * during the first phase of external sort (for both sorting without
- * replacement selection and with it). For the case with replacement
- * selection, this code also takes the limit on the output into account
- * (if specified). If number of input runs is less than the available
- * memory frames, then merging can be done in one pass, by allocating
- * one buffer per run, and one buffer as the output buffer. A
- * priorityQueue is used to find the top tuple at each iteration, among
- * all the runs' heads in memory (check RunMergingFrameReader for more
- * details). Otherwise, assuming that we have R runs and M memory
- * buffers, where (R > M), we first merge first (M-1) runs and create a
- * new sorted run, out of them. Discarding the first (M-1) runs, now
- * merging procedure gets applied recursively on the (R-M+2) remaining
- * runs using the M memory buffers. For the case of replacement
- * selection, if outputLimit is specified, once the final pass is done
- * on the runs (which is the pass that generates the final sorted
- * output), as soon as the output size hits the output limit, the
- * process stops, closes, and returns.
- */
+import edu.uci.ics.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
public class ExternalSortRunMerger {
- private final IHyracksTaskContext ctx;
- private final List<IFrameReader> runs;
+ protected final IHyracksTaskContext ctx;
+ protected final IFrameWriter writer;
+
+ private final List<RunAndMaxFrameSizePair> runs;
+ private final BitSet currentGenerationRunAvailable;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
private final INormalizedKeyComputer nmkComputer;
private final RecordDescriptor recordDesc;
private final int framesLimit;
- private final IFrameWriter writer;
- private List<ByteBuffer> inFrames;
- private ByteBuffer outFrame;
- private FrameTupleAppender outFrameAppender;
-
- private IFrameSorter frameSorter; // Used in External sort, no replacement
- // selection
- private FrameTupleAccessor outFrameAccessor; // Used in External sort, with
- // replacement selection
- private final int outputLimit; // Used in External sort, with replacement
- // selection and limit on output size
- private int currentSize; // Used in External sort, with replacement
- // selection and limit on output size
-
- // Constructor for external sort, no replacement selection
- public ExternalSortRunMerger(IHyracksTaskContext ctx, IFrameSorter frameSorter, List<IFrameReader> runs,
+ private final int MAX_FRAME_SIZE;
+ private final ArrayList<IFrameReader> tempRuns;
+ private final int topK;
+ private List<GroupVSizeFrame> inFrames;
+ private VSizeFrame outputFrame;
+ private ISorter sorter;
+
+ private static final Logger LOGGER = Logger.getLogger(ExternalSortRunMerger.class.getName());
+
+ public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<RunAndMaxFrameSizePair> runs,
int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
- this.ctx = ctx;
- this.frameSorter = frameSorter;
- this.runs = new LinkedList<IFrameReader>(runs);
- this.sortFields = sortFields;
- this.comparators = comparators;
- this.nmkComputer = nmkComputer;
- this.recordDesc = recordDesc;
- this.framesLimit = framesLimit;
- this.writer = writer;
- this.outputLimit = -1;
+ this(ctx, sorter, runs, sortFields, comparators, nmkComputer, recordDesc, framesLimit,
+ Integer.MAX_VALUE, writer);
}
- // Constructor for external sort with replacement selection
- public ExternalSortRunMerger(IHyracksTaskContext ctx, int outputLimit, List<IFrameReader> runs, int[] sortFields,
- IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, RecordDescriptor recordDesc,
- int framesLimit, IFrameWriter writer) {
+ public ExternalSortRunMerger(IHyracksTaskContext ctx, ISorter sorter, List<RunAndMaxFrameSizePair> runs,
+ int[] sortFields, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
+ RecordDescriptor recordDesc, int framesLimit, int topK, IFrameWriter writer) {
this.ctx = ctx;
- this.runs = new LinkedList<IFrameReader>(runs);
+ this.sorter = sorter;
+ this.runs = new LinkedList<>(runs);
+ this.currentGenerationRunAvailable = new BitSet(runs.size());
this.sortFields = sortFields;
this.comparators = comparators;
this.nmkComputer = nmkComputer;
this.recordDesc = recordDesc;
this.framesLimit = framesLimit;
this.writer = writer;
- this.outputLimit = outputLimit;
- this.currentSize = 0;
- this.frameSorter = null;
+ this.MAX_FRAME_SIZE = FrameConstants.MAX_NUM_MINFRAME * ctx.getInitialFrameSize();
+ this.topK = topK;
+ this.tempRuns = new ArrayList<>(runs.size());
}
public void process() throws HyracksDataException {
- writer.open();
+ IFrameWriter finalWriter = null;
try {
if (runs.size() <= 0) {
- if (frameSorter != null && frameSorter.getFrameCount() > 0) {
- frameSorter.flushFrames(writer);
+ finalWriter = prepareSkipMergingFinalResultWriter(writer);
+ finalWriter.open();
+ if (sorter != null) {
+ if (sorter.hasRemaining()) {
+ sorter.flush(finalWriter);
+ }
+ sorter.close();
}
- /** recycle sort buffer */
- frameSorter.close();
} else {
/** recycle sort buffer */
- frameSorter.close();
-
- inFrames = new ArrayList<ByteBuffer>();
- outFrame = ctx.allocateFrame();
- outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
- outFrameAppender.reset(outFrame, true);
- for (int i = 0; i < framesLimit - 1; ++i) {
- inFrames.add(ctx.allocateFrame());
+ if (sorter != null) {
+ sorter.close();
}
+
+ finalWriter = prepareFinalMergeResultWriter(writer);
+ finalWriter.open();
+
int maxMergeWidth = framesLimit - 1;
- while (runs.size() > maxMergeWidth) {
- int generationSeparator = 0;
- while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
- int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth),
- runs.size() - maxMergeWidth + 1);
- FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class
- .getSimpleName());
- IFrameWriter mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
- mergeResultWriter.open();
- IFrameReader[] runCursors = new RunFileReader[mergeWidth];
- for (int i = 0; i < mergeWidth; i++) {
- runCursors[i] = runs.get(generationSeparator + i);
+
+ inFrames = new ArrayList<>(maxMergeWidth);
+ outputFrame = new VSizeFrame(ctx);
+ List<RunAndMaxFrameSizePair> partialRuns = new ArrayList<>(maxMergeWidth);
+
+ int stop = runs.size();
+ currentGenerationRunAvailable.set(0, stop);
+
+ while (true) {
+
+ int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns,
+ currentGenerationRunAvailable,
+ stop);
+ prepareFrames(unUsed, inFrames, partialRuns);
+
+ if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) {
+ IFrameReader reader;
+ int mergedMaxFrameSize;
+ if (partialRuns.size() == 1) {
+ if (!currentGenerationRunAvailable.isEmpty()) {
+ throw new HyracksDataException(
+ "The record is too big to put into the merging frame, please"
+ + " allocate more sorting memory");
+ } else {
+ reader = partialRuns.get(0).run;
+ mergedMaxFrameSize = partialRuns.get(0).maxFrameSize;
+ }
+
+ } else {
+ RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile();
+ IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter);
+
+ mergeResultWriter.open();
+ mergedMaxFrameSize = merge(mergeResultWriter, partialRuns);
+ mergeResultWriter.close();
+
+ reader = mergeFileWriter.createReader();
}
- merge(mergeResultWriter, runCursors);
- mergeResultWriter.close();
- runs.subList(generationSeparator, mergeWidth + generationSeparator).clear();
- runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
- }
- }
- if (!runs.isEmpty()) {
- IFrameReader[] runCursors = new RunFileReader[runs.size()];
- for (int i = 0; i < runCursors.length; i++) {
- runCursors[i] = runs.get(i);
+
+ appendNewRuns(reader, mergedMaxFrameSize);
+ if (currentGenerationRunAvailable.isEmpty()) {
+
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("generated runs:" + stop);
+ }
+ runs.subList(0, stop).clear();
+ currentGenerationRunAvailable.clear();
+ currentGenerationRunAvailable.set(0, runs.size());
+ stop = runs.size();
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("final runs:" + stop);
+ }
+ merge(finalWriter, partialRuns);
+ break;
}
- merge(writer, runCursors);
}
}
} catch (Exception e) {
- writer.fail();
+ finalWriter.fail();
throw new HyracksDataException(e);
} finally {
- writer.close();
+ finalWriter.close();
}
}
- private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
- RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields, comparators,
- nmkComputer, recordDesc);
- merger.open();
- try {
- while (merger.nextFrame(outFrame)) {
- FrameUtils.flushFrame(outFrame, mergeResultWriter);
+ private void appendNewRuns(IFrameReader reader, int mergedPartialMaxSize) {
+ runs.add(new RunAndMaxFrameSizePair(reader, mergedPartialMaxSize));
+ }
+
+ private static int selectPartialRuns(int budget, List<RunAndMaxFrameSizePair> runs,
+ List<RunAndMaxFrameSizePair> partialRuns, BitSet runAvailable, int stop) {
+ partialRuns.clear();
+ int maxFrameSizeOfGenRun = 0;
+ int nextRunId = runAvailable.nextSetBit(0);
+ while (budget > 0 && nextRunId >= 0 && nextRunId < stop) {
+ int runFrameSize = runs.get(nextRunId).maxFrameSize;
+ if (budget - runFrameSize >= 0) {
+ partialRuns.add(runs.get(nextRunId));
+ budget -= runFrameSize;
+ runAvailable.clear(nextRunId);
+ maxFrameSizeOfGenRun = runFrameSize > maxFrameSizeOfGenRun ? runFrameSize : maxFrameSizeOfGenRun;
}
- } finally {
- merger.close();
+ nextRunId = runAvailable.nextSetBit(nextRunId + 1);
}
+ return budget;
}
- public void processWithReplacementSelection() throws HyracksDataException {
- writer.open();
- try {
- outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
- outFrame = ctx.allocateFrame();
- outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
- outFrameAppender.reset(outFrame, true);
- if (runs.size() == 1) {
- if (outputLimit < 1) {
- runs.get(0).open();
- ByteBuffer nextFrame = ctx.allocateFrame();
- while (runs.get(0).nextFrame(nextFrame)) {
- FrameUtils.flushFrame(nextFrame, writer);
- outFrameAppender.reset(nextFrame, true);
- }
- return;
- }
- // Limit on the output size
- int totalCount = 0;
- runs.get(0).open();
- FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
- ByteBuffer nextFrame = ctx.allocateFrame();
- while (totalCount <= outputLimit && runs.get(0).nextFrame(nextFrame)) {
- fta.reset(nextFrame);
- int tupCount = fta.getTupleCount();
- if ((totalCount + tupCount) < outputLimit) {
- FrameUtils.flushFrame(nextFrame, writer);
- totalCount += tupCount;
- continue;
- }
- // The very last buffer, which exceeds the limit
- int copyCount = outputLimit - totalCount;
- outFrameAppender.reset(outFrame, true);
- for (int i = 0; i < copyCount; i++) {
- if (!outFrameAppender.append(fta, i)) {
- throw new HyracksDataException("Record size ("
- + (fta.getTupleEndOffset(i) - fta.getTupleStartOffset(i))
- + ") larger than frame size (" + outFrameAppender.getBuffer().capacity() + ")");
- }
- totalCount++;
- }
- }
- if (outFrameAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- }
- return;
+ private void prepareFrames(int extraFreeMem, List<GroupVSizeFrame> inFrames,
+ List<RunAndMaxFrameSizePair> patialRuns)
+ throws HyracksDataException {
+ if (extraFreeMem > 0 && patialRuns.size() > 1) {
+ int extraFrames = extraFreeMem / ctx.getInitialFrameSize();
+ int avg = (extraFrames / patialRuns.size()) * ctx.getInitialFrameSize();
+ int residue = (extraFrames % patialRuns.size());
+ for (int i = 0; i < residue; i++) {
+ patialRuns.get(i).updateSize(
+ Math.min(MAX_FRAME_SIZE, patialRuns.get(i).maxFrameSize + avg + ctx.getInitialFrameSize()));
}
- // More than one run, actual merging is needed
- inFrames = new ArrayList<ByteBuffer>();
- for (int i = 0; i < framesLimit - 1; ++i) {
- inFrames.add(ctx.allocateFrame());
- }
- while (runs.size() > 0) {
- try {
- doPassWithReplacementSelection(runs);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
+ for (int i = residue; i < patialRuns.size() && avg > 0; i++) {
+ patialRuns.get(i).updateSize(Math.min(MAX_FRAME_SIZE, patialRuns.get(i).maxFrameSize + avg));
}
+ }
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- writer.close();
+ if (inFrames.size() > patialRuns.size()) {
+ inFrames.subList(patialRuns.size(), inFrames.size()).clear();
+ }
+ int i;
+ for (i = 0; i < inFrames.size(); i++) {
+ inFrames.get(i).resize(patialRuns.get(i).maxFrameSize);
+ }
+ for (; i < patialRuns.size(); i++) {
+ inFrames.add(new GroupVSizeFrame(ctx, patialRuns.get(i).maxFrameSize));
}
}
- // creates a new run from runs that can fit in memory.
- private void doPassWithReplacementSelection(List<IFrameReader> runs) throws HyracksDataException {
- FileReference newRun = null;
- IFrameWriter writer = this.writer;
- boolean finalPass = false;
- if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
- finalPass = true;
- for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
- inFrames.remove(i);
- }
- } else {
- newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class.getSimpleName());
- writer = new RunFileWriter(newRun, ctx.getIOManager());
- writer.open();
- }
- try {
- IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
- for (int i = 0; i < inFrames.size(); i++) {
- runCursors[i] = runs.get(i);
- }
- RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
- comparators, nmkComputer, recordDesc);
- merger.open();
- try {
- while (merger.nextFrame(outFrame)) {
- if (outputLimit > 0 && finalPass) {
- outFrameAccessor.reset(outFrame);
- int count = outFrameAccessor.getTupleCount();
- if ((currentSize + count) > outputLimit) {
- ByteBuffer b = ctx.allocateFrame();
- FrameTupleAppender partialAppender = new FrameTupleAppender(ctx.getFrameSize());
- partialAppender.reset(b, true);
- int copyCount = outputLimit - currentSize;
- for (int i = 0; i < copyCount; i++) {
- partialAppender.append(outFrameAccessor, i);
- currentSize++;
- }
- FrameUtils.makeReadable(b);
- FrameUtils.flushFrame(b, writer);
- break;
- } else {
- FrameUtils.flushFrame(outFrame, writer);
- currentSize += count;
- }
- } else {
- FrameUtils.flushFrame(outFrame, writer);
- }
- }
- } finally {
- merger.close();
- }
+ protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
+ return nextWriter;
+ }
- if (outputLimit > 0 && finalPass && (currentSize >= outputLimit)) {
- runs.clear();
- return;
- }
+ protected RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException {
+ FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class.getSimpleName());
+ return new RunFileWriter(newRun, ctx.getIOManager());
+ }
+
+ protected IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
+ throws HyracksDataException {
+ return mergeFileWriter;
+ }
+
+ protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException {
+ return nextWriter;
+ }
+
+ protected int[] getSortFields() {
+ return sortFields;
+ }
- runs.subList(0, inFrames.size()).clear();
- if (!finalPass) {
- runs.add(0, ((RunFileWriter) writer).createReader());
+ private int merge(IFrameWriter writer, List<RunAndMaxFrameSizePair> partialRuns)
+ throws HyracksDataException {
+ tempRuns.clear();
+ for (int i = 0; i < partialRuns.size(); i++) {
+ tempRuns.add(partialRuns.get(i).run);
+ }
+ RunMergingFrameReader merger = new RunMergingFrameReader(ctx, tempRuns, inFrames, getSortFields(),
+ comparators, nmkComputer, recordDesc, topK);
+ int maxFrameSize = 0;
+ int io = 0;
+ merger.open();
+ try {
+ while (merger.nextFrame(outputFrame)) {
+ FrameUtils.flushFrame(outputFrame.getBuffer(), writer);
+ maxFrameSize = maxFrameSize < outputFrame.getFrameSize() ? outputFrame.getFrameSize() : maxFrameSize;
+ io++;
}
} finally {
- if (!finalPass) {
- writer.close();
+ merger.close();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Output " + io + " frames");
}
}
+ return maxFrameSize;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
index 8dbdbd4..82a8453 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterMergeSort.java
@@ -1,161 +1,69 @@
/*
* Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
-public class FrameSorterMergeSort implements IFrameSorter {
- private final IHyracksTaskContext ctx;
- private final int[] sortFields;
- private final INormalizedKeyComputer nkc;
- private final IBinaryComparator[] comparators;
- private final List<ByteBuffer> buffers;
+public class FrameSorterMergeSort extends AbstractFrameSorter {
- private final FrameTupleAccessor fta1;
- private final FrameTupleAccessor fta2;
-
- private final FrameTupleAppender appender;
-
- private final ByteBuffer outFrame;
-
- private int dataFrameCount;
- private int[] tPointers;
private int[] tPointersTemp;
- private int tupleCount;
+ private FrameTupleAccessor fta2;
- public FrameSorterMergeSort(IHyracksTaskContext ctx, int[] sortFields,
+ public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) throws HyracksDataException {
- this.ctx = ctx;
- this.sortFields = sortFields;
- nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- buffers = new ArrayList<ByteBuffer>();
- fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- appender = new FrameTupleAppender(ctx.getFrameSize());
- outFrame = ctx.allocateFrame();
-
- dataFrameCount = 0;
+ this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+ Integer.MAX_VALUE);
}
- @Override
- public void reset() {
- dataFrameCount = 0;
- tupleCount = 0;
- }
-
- @Override
- public int getFrameCount() {
- return dataFrameCount;
- }
-
- @Override
- public void insertFrame(ByteBuffer buffer) throws HyracksDataException {
- ByteBuffer copyFrame;
- if (dataFrameCount == buffers.size()) {
- copyFrame = ctx.allocateFrame();
- buffers.add(copyFrame);
- } else {
- copyFrame = buffers.get(dataFrameCount);
- }
- FrameUtils.copy(buffer, copyFrame);
- ++dataFrameCount;
+ public FrameSorterMergeSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException {
+ super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+ outputLimit);
+ fta2 = new FrameTupleAccessor(recordDescriptor);
}
@Override
- public void sortFrames() throws HyracksDataException {
- int nBuffers = dataFrameCount;
- tupleCount = 0;
- for (int i = 0; i < nBuffers; ++i) {
- fta1.reset(buffers.get(i));
- tupleCount += fta1.getTupleCount();
- }
- int sfIdx = sortFields[0];
- tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;
- int ptr = 0;
- for (int i = 0; i < nBuffers; ++i) {
- fta1.reset(buffers.get(i));
- int tCount = fta1.getTupleCount();
- byte[] array = fta1.getBuffer().array();
- for (int j = 0; j < tCount; ++j) {
- int tStart = fta1.getTupleStartOffset(j);
- int tEnd = fta1.getTupleEndOffset(j);
- tPointers[ptr * 4] = i;
- tPointers[ptr * 4 + 1] = tStart;
- tPointers[ptr * 4 + 2] = tEnd;
- int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);
- int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);
- int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();
- tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
- ++ptr;
- }
- }
- if (tupleCount > 0) {
+ void sortTupleReferences() throws HyracksDataException {
+ if (tPointersTemp == null || tPointersTemp.length < tPointers.length) {
tPointersTemp = new int[tPointers.length];
- sort(0, tupleCount);
}
+ sort(0, tupleCount);
}
@Override
- public void flushFrames(IFrameWriter writer) throws HyracksDataException {
- appender.reset(outFrame, true);
- for (int ptr = 0; ptr < tupleCount; ++ptr) {
- int i = tPointers[ptr * 4];
- int tStart = tPointers[ptr * 4 + 1];
- int tEnd = tPointers[ptr * 4 + 2];
- ByteBuffer buffer = buffers.get(i);
- fta1.reset(buffer);
- if (!appender.append(fta1, tStart, tEnd)) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- if (!appender.append(fta1, tStart, tEnd)) {
- throw new HyracksDataException("Record size (" + (tEnd - tStart) + ") larger than frame size ("
- + appender.getBuffer().capacity() + ")");
- }
- }
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- }
+ public void close() {
+ super.close();
+ tPointersTemp = null;
}
- private void sort(int offset, int length) throws HyracksDataException {
+ void sort(int offset, int length) throws HyracksDataException {
int step = 1;
- int len = length;
- int end = offset + len;
+ int end = offset + length;
/** bottom-up merge */
- while (step < len) {
+ while (step < length) {
/** merge */
for (int i = offset; i < end; i += 2 * step) {
int next = i + step;
@@ -175,8 +83,6 @@ public class FrameSorterMergeSort implements IFrameSorter {
/**
* Merge two subarrays into one
- *
- * @throws HyracksDataException
*/
private void merge(int start1, int start2, int len1, int len2) throws HyracksDataException {
int targetPos = start1;
@@ -226,20 +132,20 @@ public class FrameSorterMergeSort implements IFrameSorter {
}
int i2 = tp2i;
int j2 = tp2j;
- ByteBuffer buf1 = buffers.get(i1);
- ByteBuffer buf2 = buffers.get(i2);
+ ByteBuffer buf1 = super.bufferManager.getFrame(i1);
+ ByteBuffer buf2 = super.bufferManager.getFrame(i2);
byte[] b1 = buf1.array();
byte[] b2 = buf2.array();
- fta1.reset(buf1);
+ inputTupleAccessor.reset(buf1);
fta2.reset(buf2);
for (int f = 0; f < comparators.length; ++f) {
int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(buf1.array(), j1 + (fIdx - 1) * 4);
- int f1End = IntSerDeUtils.getInt(buf1.array(), j1 + fIdx * 4);
- int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
+ int f1Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(b1, j1 + (fIdx - 1) * 4);
+ int f1End = IntSerDeUtils.getInt(b1, j1 + fIdx * 4);
+ int s1 = j1 + inputTupleAccessor.getFieldSlotsLength() + f1Start;
int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(buf2.array(), j2 + (fIdx - 1) * 4);
- int f2End = IntSerDeUtils.getInt(buf2.array(), j2 + fIdx * 4);
+ int f2Start = fIdx == 0 ? 0 : IntSerDeUtils.getInt(b2, j2 + (fIdx - 1) * 4);
+ int f2End = IntSerDeUtils.getInt(b2, j2 + fIdx * 4);
int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
int l2 = f2End - f2Start;
int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
@@ -250,8 +156,4 @@ public class FrameSorterMergeSort implements IFrameSorter {
return 0;
}
- @Override
- public void close() {
- this.buffers.clear();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
index d607a51..328bb5e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorterQuickSort.java
@@ -1,153 +1,54 @@
/*
* Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
-public class FrameSorterQuickSort implements IFrameSorter {
- private final IHyracksTaskContext ctx;
- private final int[] sortFields;
- private final INormalizedKeyComputer nkc;
- private final IBinaryComparator[] comparators;
- private final List<ByteBuffer> buffers;
+public class FrameSorterQuickSort extends AbstractFrameSorter {
- private final FrameTupleAccessor fta1;
- private final FrameTupleAccessor fta2;
+ private FrameTupleAccessor fta2;
- private final FrameTupleAppender appender;
-
- private final ByteBuffer outFrame;
-
- private int dataFrameCount;
- private int[] tPointers;
- private int tupleCount;
-
- public FrameSorterQuickSort(IHyracksTaskContext ctx, int[] sortFields,
+ public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) throws HyracksDataException {
- this.ctx = ctx;
- this.sortFields = sortFields;
- nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- buffers = new ArrayList<ByteBuffer>();
- fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- appender = new FrameTupleAppender(ctx.getFrameSize());
- outFrame = ctx.allocateFrame();
-
- dataFrameCount = 0;
- }
-
- @Override
- public void reset() {
- dataFrameCount = 0;
- tupleCount = 0;
- }
-
- @Override
- public int getFrameCount() {
- return dataFrameCount;
+ this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+ Integer.MAX_VALUE);
}
- @Override
- public void insertFrame(ByteBuffer buffer) throws HyracksDataException {
- ByteBuffer copyFrame;
- if (dataFrameCount == buffers.size()) {
- copyFrame = ctx.allocateFrame();
- buffers.add(copyFrame);
- } else {
- copyFrame = buffers.get(dataFrameCount);
- }
- FrameUtils.copy(buffer, copyFrame);
- ++dataFrameCount;
- }
-
- @Override
- public void sortFrames() throws HyracksDataException {
- int nBuffers = dataFrameCount;
- tupleCount = 0;
- for (int i = 0; i < nBuffers; ++i) {
- fta1.reset(buffers.get(i));
- tupleCount += fta1.getTupleCount();
- }
- int sfIdx = sortFields[0];
- tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;
- int ptr = 0;
- for (int i = 0; i < nBuffers; ++i) {
- fta1.reset(buffers.get(i));
- int tCount = fta1.getTupleCount();
- byte[] array = fta1.getBuffer().array();
- for (int j = 0; j < tCount; ++j) {
- int tStart = fta1.getTupleStartOffset(j);
- int tEnd = fta1.getTupleEndOffset(j);
- tPointers[ptr * 4] = i;
- tPointers[ptr * 4 + 1] = tStart;
- tPointers[ptr * 4 + 2] = tEnd;
- int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);
- int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);
- int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();
- tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
- ++ptr;
- }
- }
- if (tupleCount > 0) {
- sort(tPointers, 0, tupleCount);
- }
+ public FrameSorterQuickSort(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, int outputLimit) throws HyracksDataException {
+ super(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+ outputLimit);
+ fta2 = new FrameTupleAccessor(recordDescriptor);
}
@Override
- public void flushFrames(IFrameWriter writer) throws HyracksDataException {
- appender.reset(outFrame, true);
- for (int ptr = 0; ptr < tupleCount; ++ptr) {
- int i = tPointers[ptr * 4];
- int tStart = tPointers[ptr * 4 + 1];
- int tEnd = tPointers[ptr * 4 + 2];
- ByteBuffer buffer = buffers.get(i);
- fta1.reset(buffer);
- if (!appender.append(fta1, tStart, tEnd)) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- if (!appender.append(fta1, tStart, tEnd)) {
- throw new HyracksDataException("Record size (" + (tEnd - tStart) + ") larger than frame size ("
- + appender.getBuffer().capacity() + ")");
- }
- }
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- }
+ void sortTupleReferences() throws HyracksDataException {
+ sort(tPointers, 0, tupleCount);
}
- private void sort(int[] tPointers, int offset, int length) throws HyracksDataException {
+ void sort(int[] tPointers, int offset, int length) throws HyracksDataException {
int m = offset + (length >> 1);
int mi = tPointers[m * 4];
int mj = tPointers[m * 4 + 1];
@@ -221,17 +122,17 @@ public class FrameSorterQuickSort implements IFrameSorter {
}
int i2 = tp2i;
int j2 = tp2j;
- ByteBuffer buf1 = buffers.get(i1);
- ByteBuffer buf2 = buffers.get(i2);
+ ByteBuffer buf1 = super.bufferManager.getFrame(i1);
+ ByteBuffer buf2 = super.bufferManager.getFrame(i2);
byte[] b1 = buf1.array();
byte[] b2 = buf2.array();
- fta1.reset(buf1);
+ inputTupleAccessor.reset(buf1);
fta2.reset(buf2);
for (int f = 0; f < comparators.length; ++f) {
int fIdx = sortFields[f];
int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);
int f1End = buf1.getInt(j1 + fIdx * 4);
- int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
+ int s1 = j1 + inputTupleAccessor.getFieldSlotsLength() + f1Start;
int l1 = f1End - f1Start;
int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);
int f2End = buf2.getInt(j2 + fIdx * 4);
@@ -245,8 +146,4 @@ public class FrameSorterQuickSort implements IFrameSorter {
return 0;
}
- @Override
- public void close() {
- this.buffers.clear();
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HeapSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
new file mode 100644
index 0000000..564a462
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFramePool;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.ITupleBufferManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableTupleMemoryManager;
+
+public class HeapSortRunGenerator extends AbstractSortRunGenerator {
+ protected final IHyracksTaskContext ctx;
+ protected final int frameLimit;
+ protected final int topK;
+ protected final int[] sortFields;
+ protected final INormalizedKeyComputerFactory nmkFactory;
+ protected final IBinaryComparatorFactory[] comparatorFactories;
+ protected final RecordDescriptor recordDescriptor;
+ protected ITupleSorter tupleSorter;
+ protected IFrameTupleAccessor inAccessor;
+
+ public HeapSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) {
+ super();
+ this.ctx = ctx;
+ this.frameLimit = frameLimit;
+ this.topK = topK;
+ this.sortFields = sortFields;
+ this.nmkFactory = firstKeyNormalizerFactory;
+ this.comparatorFactories = comparatorFactories;
+ this.inAccessor = new FrameTupleAccessor(recordDescriptor);
+ this.recordDescriptor = recordDescriptor;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ IFramePool framePool = new VariableFramePool(ctx, (frameLimit - 1) * ctx.getInitialFrameSize());
+ ITupleBufferManager bufferManager = new VariableTupleMemoryManager(framePool, recordDescriptor);
+ tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, sortFields, nmkFactory,
+ comparatorFactories);
+ super.open();
+ }
+
+ @Override
+ public ISorter getSorter() throws HyracksDataException {
+ return tupleSorter;
+ }
+
+ @Override
+ protected RunFileWriter getRunFileWriter() throws HyracksDataException {
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ HeapSortRunGenerator.class.getSimpleName());
+ return new RunFileWriter(file, ctx.getIOManager());
+ }
+
+ @Override
+ protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException {
+ return writer;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inAccessor.reset(buffer);
+ for (int i = 0; i < inAccessor.getTupleCount(); i++) {
+ if (!tupleSorter.insertTuple(inAccessor, i)) {
+ flushFramesToRun();
+ if (!tupleSorter.insertTuple(inAccessor, i)) {
+ throw new HyracksDataException("The given tuple is too big to insert into the sorting memory.");
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
new file mode 100644
index 0000000..9976aad
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/HybridTopKSortRunGenerator.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotBiggestFirst;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
+
+public class HybridTopKSortRunGenerator extends HeapSortRunGenerator {
+ private static final Logger LOG = Logger.getLogger(HybridTopKSortRunGenerator.class.getName());
+
+ private static final int SWITCH_TO_FRAME_SORTER_THRESHOLD = 2;
+ private IFrameSorter frameSorter = null;
+ private int tupleSorterFlushedTimes = 0;
+
+ public HybridTopKSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int topK, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) {
+ super(ctx, frameLimit, topK, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor);
+ }
+
+ @Override
+ public ISorter getSorter() throws HyracksDataException {
+ if (tupleSorter != null) {
+ return tupleSorter;
+ } else if (frameSorter != null) {
+ return frameSorter;
+ }
+ return null;
+ }
+
+ @Override
+ protected RunFileWriter getRunFileWriter() throws HyracksDataException {
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ HybridTopKSortRunGenerator.class.getSimpleName());
+ return new RunFileWriter(file, ctx.getIOManager());
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inAccessor.reset(buffer);
+ if (tupleSorter != null) {
+ boolean isBadK = false;
+ for (int i = 0; i < inAccessor.getTupleCount(); i++) {
+ if (!tupleSorter.insertTuple(inAccessor, i)) {
+ flushFramesToRun();
+ isBadK = true;
+ if (!tupleSorter.insertTuple(inAccessor, i)) {
+ throw new HyracksDataException("The given tuple is too big to insert into the sorting memory.");
+ }
+ }
+ }
+ if (isBadK) {
+ tupleSorterFlushedTimes++;
+ if (tupleSorterFlushedTimes > SWITCH_TO_FRAME_SORTER_THRESHOLD) {
+ if (tupleSorter.hasRemaining()) {
+ flushFramesToRun();
+ }
+ tupleSorter.close();
+ tupleSorter = null;
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("clear tupleSorter");
+ }
+ }
+ }
+ } else {
+ if (frameSorter == null) {
+ VariableFrameMemoryManager bufferManager = new VariableFrameMemoryManager(
+ new VariableFramePool(ctx, (frameLimit - 1) * ctx.getInitialFrameSize()),
+ new FrameFreeSlotBiggestFirst(frameLimit - 1));
+ frameSorter = new FrameSorterMergeSort(ctx, bufferManager, sortFields, nmkFactory, comparatorFactories,
+ recordDescriptor, topK);
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("create frameSorter");
+ }
+ }
+ if (!frameSorter.insertFrame(buffer)) {
+ flushFramesToRun();
+ if (!frameSorter.insertFrame(buffer)) {
+ throw new HyracksDataException("The given frame is too big to insert into the sorting memory.");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
index d9b8d37..6d0b100 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IFrameSorter.java
@@ -1,37 +1,28 @@
/*
* Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-public interface IFrameSorter {
+public interface IFrameSorter extends ISorter {
- public void reset();
+ int getFrameCount();
- public int getFrameCount();
-
- public void insertFrame(ByteBuffer buffer) throws HyracksDataException;
-
- public void sortFrames() throws HyracksDataException;
-
- public void flushFrames(IFrameWriter writer) throws HyracksDataException;
-
- public void close();
+ boolean insertFrame(ByteBuffer inputBuffer) throws HyracksDataException;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
deleted file mode 100644
index e669335..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * @author pouria Defines the required operations, needed for any memory
- * manager, used in sorting with replacement selection, to manage the
- * free spaces
- */
-
-public interface IMemoryManager {
-
- /**
- * Allocates a free slot equal or greater than requested length. Pointer to
- * the allocated slot is put in result, and gets returned to the caller. If
- * no proper free slot is available, result would contain a null/invalid
- * pointer (may vary between different implementations)
- *
- * @param length
- * @param result
- * @throws HyracksDataException
- */
- void allocate(int length, Slot result) throws HyracksDataException;
-
- /**
- * Unallocates the specified slot (and returns it back to the free slots
- * set)
- *
- * @param s
- * @return the total length of unallocted slot
- * @throws HyracksDataException
- */
- int unallocate(Slot s) throws HyracksDataException;
-
- /**
- * @param frameIndex
- * @return the specified frame, from the set of memory buffers, being
- * managed by this memory manager
- */
- ByteBuffer getFrame(int frameIndex);
-
- /**
- * Writes the specified tuple into the specified memory slot (denoted by
- * frameIx and offset)
- *
- * @param frameIx
- * @param offset
- * @param src
- * @param tIndex
- * @return
- */
- boolean writeTuple(int frameIx, int offset, FrameTupleAccessor src, int tIndex);
-
- /**
- * Reads the specified tuple (denoted by frameIx and offset) and appends it
- * to the passed FrameTupleAppender
- *
- * @param frameIx
- * @param offset
- * @param dest
- * @return
- */
- boolean readTuple(int frameIx, int offset, FrameTupleAppender dest);
-
- /**
- * close and cleanup the memory manager
- */
- void close();
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
index 2840d01..d21255e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
@@ -16,7 +16,6 @@ package edu.uci.ics.hyracks.dataflow.std.sort;
import java.util.List;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
/**
@@ -28,5 +27,5 @@ public interface IRunGenerator extends IFrameWriter {
/**
* @return the list of generated (sorted) runs
*/
- public List<IFrameReader> getRuns();
+ List<RunAndMaxFrameSizePair> getRuns();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
deleted file mode 100644
index 8cff0df..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * @author pouria
- * Defines the selection tree, used in sorting with replacement
- * selection to manage the order of output tuples into the runs, during
- * the run generation phase. This tree contains tuples, belonging to two
- * different runs: - Current run (being written to the output) - Next
- * run
- */
-
-public interface ISelectionTree {
-
- /**
- * Inserts a new element into the selectionTree
- *
- * @param element
- * contains the pointer to the memory slot, containing the tuple,
- * along with its run number
- * @throws HyracksDataException
- */
- void insert(int[] element) throws HyracksDataException;
-
- /**
- * Removes and returns the smallest element in the tree
- *
- * @param result
- * is the array that will eventually contain minimum entry
- * pointer
- * @throws HyracksDataException
- */
- void getMin(int[] result) throws HyracksDataException;
-
- /**
- * Removes and returns the largest element in the tree
- *
- * @param result
- * is the array that will eventually contain maximum entry
- * pointer
- * @throws HyracksDataException
- */
- void getMax(int[] result) throws HyracksDataException;
-
- /**
- * @return True of the selection tree does not have any element, false
- * otherwise
- */
- boolean isEmpty();
-
- /**
- * Removes all the elements in the tree
- */
- void reset();
-
- /**
- * Returns (and does NOT remove) the smallest element in the tree
- *
- * @param result
- * is the array that will eventually contain minimum entry
- * pointer
- */
- void peekMin(int[] result);
-
- /**
- * Returns (and does NOT remove) the largest element in the tree
- *
- * @param result
- * is the array that will eventually contain maximum entry
- * pointer
- * @throws HyracksDataException
- */
- void peekMax(int[] result) throws HyracksDataException;
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISorter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISorter.java
new file mode 100644
index 0000000..09a8169
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISorter.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISorter {
+
+ boolean hasRemaining();
+
+ void reset() throws HyracksDataException;
+
+ void sort() throws HyracksDataException;
+
+ void close();
+
+ int flush(IFrameWriter writer) throws HyracksDataException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ITupleSorter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ITupleSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ITupleSorter.java
new file mode 100644
index 0000000..bea8b35
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ITupleSorter.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITupleSorter extends ISorter {
+ int getTupleCount();
+
+ boolean insertTuple(IFrameTupleAccessor frameTupleAccessor, int index) throws HyracksDataException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 6fa21b5..379a783 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -36,6 +36,10 @@ import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -76,7 +80,7 @@ public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
builder.addBlockingEdge(sa, ma);
}
- public static class SortTaskState extends AbstractStateObject {
+ private static class SortTaskState extends AbstractStateObject {
private FrameSorterMergeSort frameSorter;
public SortTaskState() {
@@ -110,20 +114,29 @@ public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
@Override
public void open() throws HyracksDataException {
- state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
- state.frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, recordDescriptors[0]);
+ state = new SortTaskState(ctx.getJobletContext().getJobId(),
+ new TaskId(getActivityId(), partition));
+
+ IFrameBufferManager frameBufferManager = new VariableFrameMemoryManager(
+ new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY), new FrameFreeSlotLastFit());
+
+ state.frameSorter = new FrameSorterMergeSort(ctx, frameBufferManager, sortFields,
+ firstKeyNormalizerFactory, comparatorFactories, recordDescriptors[0]);
state.frameSorter.reset();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- state.frameSorter.insertFrame(buffer);
+ if (!state.frameSorter.insertFrame(buffer)) {
+ throw new HyracksDataException("Failed to insert the given frame into sorting buffer. "
+ + "Please increase the sorting memory budget to enable the in-memory sorting, "
+ + "or you could use ExternalSort instead.");
+ }
}
@Override
public void close() throws HyracksDataException {
- state.frameSorter.sortFrames();
+ state.frameSorter.sort();
ctx.setStateObject(state);
}
@@ -152,7 +165,7 @@ public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
try {
SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(
getOperatorId(), SORT_ACTIVITY_ID), partition));
- state.frameSorter.flushFrames(writer);
+ state.frameSorter.flush(writer);
} catch (Exception e) {
writer.fail();
throw new HyracksDataException(e);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
deleted file mode 100644
index ef1ae88..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.sort;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/**
- * @author pouria
- * Operator descriptor for sorting with replacement, consisting of two
- * phases:
- * - Run Generation: Denoted by OptimizedSortActivity below, in which
- * sort runs get generated from the input data. This phases uses the
- * Selection Tree and Memory Manager to benefit from the replacement
- * selection optimization, to create runs which are longer than the
- * available memory size.
- * - Merging: Denoted by MergeActivity below, in which runs (generated
- * in the previous phase) get merged via a merger. Each run has a single
- * buffer in memory, and a priority queue is used to select the top
- * tuple each time. Top tuple is send to a new run or output
- */
-
-public class OptimizedExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
-
- private static final int NO_LIMIT = -1;
- private static final long serialVersionUID = 1L;
- private static final int SORT_ACTIVITY_ID = 0;
- private static final int MERGE_ACTIVITY_ID = 1;
-
- private final int[] sortFields;
- private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final int memSize;
- private final int outputLimit;
-
- public OptimizedExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
- this(spec, framesLimit, NO_LIMIT, sortFields, null, comparatorFactories, recordDescriptor);
- }
-
- public OptimizedExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int outputLimit,
- int[] sortFields, IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
- this(spec, framesLimit, outputLimit, sortFields, null, comparatorFactories, recordDescriptor);
- }
-
- public OptimizedExternalSortOperatorDescriptor(IOperatorDescriptorRegistry spec, int memSize, int outputLimit,
- int[] sortFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
- super(spec, 1, 1);
- this.memSize = memSize;
- this.outputLimit = outputLimit;
- this.sortFields = sortFields;
- this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
- this.comparatorFactories = comparatorFactories;
- if (memSize <= 1) {
- throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
- }
- recordDescriptors[0] = recordDescriptor;
- }
-
- @Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- OptimizedSortActivity osa = new OptimizedSortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
- OptimizedMergeActivity oma = new OptimizedMergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
-
- builder.addActivity(this, osa);
- builder.addSourceEdge(0, osa, 0);
-
- builder.addActivity(this, oma);
- builder.addTargetEdge(0, oma, 0);
-
- builder.addBlockingEdge(osa, oma);
- }
-
- public static class OptimizedSortTaskState extends AbstractStateObject {
- private List<IFrameReader> runs;
-
- public OptimizedSortTaskState() {
- }
-
- private OptimizedSortTaskState(JobId jobId, TaskId taskId) {
- super(jobId, taskId);
- }
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
-
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
-
- }
- }
-
- private class OptimizedSortActivity extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- public OptimizedSortActivity(ActivityId id) {
- super(id);
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
- final IRunGenerator runGen;
- if (outputLimit == NO_LIMIT) {
- runGen = new OptimizedExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, recordDescriptors[0], memSize);
- } else {
- runGen = new OptimizedExternalSortRunGeneratorWithLimit(ctx, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, recordDescriptors[0], memSize, outputLimit);
- }
-
- IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- @Override
- public void open() throws HyracksDataException {
-
- runGen.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- runGen.nextFrame(buffer);
- }
-
- @Override
- public void close() throws HyracksDataException {
- OptimizedSortTaskState state = new OptimizedSortTaskState(ctx.getJobletContext().getJobId(),
- new TaskId(getActivityId(), partition));
- runGen.close();
- state.runs = runGen.getRuns();
- ctx.setStateObject(state);
-
- }
-
- @Override
- public void fail() throws HyracksDataException {
- runGen.fail();
- }
- };
- return op;
- }
- }
-
- private class OptimizedMergeActivity extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- public OptimizedMergeActivity(ActivityId id) {
- super(id);
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
- IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
- @Override
- public void initialize() throws HyracksDataException {
- OptimizedSortTaskState state = (OptimizedSortTaskState) ctx.getStateObject(new TaskId(
- new ActivityId(getOperatorId(), SORT_ACTIVITY_ID), partition));
-
- List<IFrameReader> runs = state.runs;
-
- IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
-
- INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ? null
- : firstKeyNormalizerFactory.createNormalizedKeyComputer();
- int necessaryFrames = Math.min(runs.size() + 2, memSize);
- ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, outputLimit, runs, sortFields,
- comparators, nmkComputer, recordDescriptors[0], necessaryFrames, writer);
-
- merger.processWithReplacementSelection();
-
- }
- };
- return op;
- }
- }
-}
\ No newline at end of file