You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:26 UTC
[09/14] incubator-asterixdb-hyracks git commit:
VariableSizeFrame(VSizeFrame) support for Hyracks.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 506da2e..ed25b4f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -19,7 +19,9 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
@@ -74,7 +76,7 @@ public class OptimizedHybridHashJoin {
private final boolean isLeftOuter;
private final INullWriter[] nullWriters1;
- private ByteBuffer[] memBuffs; //Memory buffers for build
+ private IFrame[] memBuffs; //Memory buffers for build
private int[] curPBuff; //Current (last) Buffer for each partition
private int[] nextBuff; //Next buffer in the partition's buffer chain
private int[] buildPSizeInTups; //Size of build partitions (in tuples)
@@ -92,9 +94,9 @@ public class OptimizedHybridHashJoin {
private FrameTupleAppender probeTupAppenderToSpilled;
private int numOfSpilledParts;
- private ByteBuffer[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
- private ByteBuffer probeResBuff; //Buffer for probe resident partition tuples
- private ByteBuffer reloadBuffer; //Buffer for reloading spilled partitions during partition tuning
+ private IFrame[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
+ private IFrame probeResBuff; //Buffer for probe resident partition tuples
+ private IFrame reloadBuffer; //Buffer for reloading spilled partitions during partition tuning
private int[] buildPSizeInFrames; //Used for partition tuning
private int freeFramesCounter; //Used for partition tuning
@@ -124,8 +126,8 @@ public class OptimizedHybridHashJoin {
this.buildRFWriters = new RunFileWriter[numOfPartitions];
this.probeRFWriters = new RunFileWriter[numOfPartitions];
- this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
- this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+ this.accessorBuild = new FrameTupleAccessor(buildRd);
+ this.accessorProbe = new FrameTupleAccessor(probeRd);
this.predEvaluator = predEval;
this.isLeftOuter = false;
@@ -154,8 +156,8 @@ public class OptimizedHybridHashJoin {
this.buildRFWriters = new RunFileWriter[numOfPartitions];
this.probeRFWriters = new RunFileWriter[numOfPartitions];
- this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
- this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+ this.accessorBuild = new FrameTupleAccessor(buildRd);
+ this.accessorProbe = new FrameTupleAccessor(probeRd);
this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
@@ -170,7 +172,7 @@ public class OptimizedHybridHashJoin {
}
public void initBuild() throws HyracksDataException {
- memBuffs = new ByteBuffer[memForJoin];
+ memBuffs = new IFrame[memForJoin];
curPBuff = new int[numOfPartitions];
nextBuff = new int[memForJoin];
pStatus = new BitSet(numOfPartitions);
@@ -179,19 +181,22 @@ public class OptimizedHybridHashJoin {
buildPSizeInFrames = new int[numOfPartitions];
freeFramesCounter = memForJoin - numOfPartitions;
- for (int i = 0; i < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
- memBuffs[i] = ctx.allocateFrame();
+ for (int i = 0; i
+ < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
+ memBuffs[i] = new VSizeFrame(ctx);
curPBuff[i] = i;
nextBuff[i] = -1;
buildPSizeInFrames[i] = 1; //The dedicated initial buffer
}
- nextFreeBuffIx = ((numOfPartitions < memForJoin) ? numOfPartitions : NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
+ nextFreeBuffIx = ((numOfPartitions < memForJoin) ?
+ numOfPartitions :
+ NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
for (int i = numOfPartitions; i < memBuffs.length; i++) {
nextBuff[i] = UNALLOCATED_FRAME;
}
- buildTupAppender = new FrameTupleAppender(ctx.getFrameSize());
+ buildTupAppender = new FrameTupleAppender();
}
@@ -213,7 +218,7 @@ public class OptimizedHybridHashJoin {
}
private void processTuple(int tid, int pid) throws HyracksDataException {
- ByteBuffer partition = memBuffs[curPBuff[pid]]; //Getting current buffer for the target partition
+ IFrame partition = memBuffs[curPBuff[pid]]; //Getting current buffer for the target partition
if (!pStatus.get(pid)) { //resident partition
buildTupAppender.reset(partition, false);
@@ -226,7 +231,8 @@ public class OptimizedHybridHashJoin {
if (newBuffIx == NO_MORE_FREE_BUFFER) { //Spill one partition
int pidToSpill = selectPartitionToSpill();
if (pidToSpill == -1) { //No more partition to spill
- throw new HyracksDataException("not enough memory for Hash Join (Allocation exceeds the limit)");
+ throw new HyracksDataException(
+ "not enough memory for Hash Join (Allocation exceeds the limit)");
}
spillPartition(pidToSpill);
buildTupAppender.reset(memBuffs[pidToSpill], true);
@@ -249,8 +255,8 @@ public class OptimizedHybridHashJoin {
break;
}
//Dedicated in-memory buffer for the partition is full, needed to be flushed first
- buildWrite(pid, partition);
- partition.clear();
+ buildWrite(pid, partition.getBuffer());
+ partition.reset();
needClear = true;
buildPSizeInFrames[pid]++;
}
@@ -260,7 +266,7 @@ public class OptimizedHybridHashJoin {
private int allocateFreeBuffer(int pid) throws HyracksDataException {
if (nextFreeBuffIx != NO_MORE_FREE_BUFFER) {
if (memBuffs[nextFreeBuffIx] == null) {
- memBuffs[nextFreeBuffIx] = ctx.allocateFrame();
+ memBuffs[nextFreeBuffIx] = new VSizeFrame(ctx);
}
int curPartBuffIx = curPBuff[pid];
curPBuff[pid] = nextFreeBuffIx;
@@ -274,7 +280,7 @@ public class OptimizedHybridHashJoin {
} else {
nextFreeBuffIx = oldNext;
}
- (memBuffs[curPBuff[pid]]).clear();
+ memBuffs[curPBuff[pid]].reset();
freeFramesCounter--;
return (curPBuff[pid]);
@@ -300,11 +306,10 @@ public class OptimizedHybridHashJoin {
+ " frames for Thread ID " + Thread.currentThread().getId() + " (free frames: " + freeFramesCounter
+ ").");
int curBuffIx = curPBuff[pid];
- ByteBuffer buff = null;
while (curBuffIx != END_OF_PARTITION) {
- buff = memBuffs[curBuffIx];
- buildWrite(pid, buff);
- buff.clear();
+ IFrame frame = memBuffs[curBuffIx];
+ buildWrite(pid, frame.getBuffer());
+ frame.reset();
int freedBuffIx = curBuffIx;
curBuffIx = nextBuff[curBuffIx];
@@ -346,8 +351,9 @@ public class OptimizedHybridHashJoin {
}
ByteBuffer buff = null;
- for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
- buff = memBuffs[i];
+ for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus
+ .nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
+ buff = memBuffs[i].getBuffer();
accessorBuild.reset(buff);
if (accessorBuild.getTupleCount() > 0) {
buildWrite(i, buff);
@@ -389,7 +395,7 @@ public class OptimizedHybridHashJoin {
}
private void partitionTune() throws HyracksDataException {
- reloadBuffer = ctx.allocateFrame();
+ reloadBuffer = new VSizeFrame(ctx);
ArrayList<Integer> reloadSet = selectPartitionsToReload();
for (int i = 0; i < reloadSet.size(); i++) {
int pid = reloadSet.get(i);
@@ -414,7 +420,6 @@ public class OptimizedHybridHashJoin {
loadPartitionInMem(pid, buildRFWriters[pid], buffsToLoad);
}
reloadSet.clear();
- reloadSet = null;
}
private void loadPartitionInMem(int pid, RunFileWriter wr, int[] buffs) throws HyracksDataException {
@@ -422,16 +427,16 @@ public class OptimizedHybridHashJoin {
r.open();
int counter = 0;
ByteBuffer mBuff = null;
- reloadBuffer.clear();
+ reloadBuffer.reset();
while (r.nextFrame(reloadBuffer)) {
- mBuff = memBuffs[buffs[counter]];
- if (mBuff == null) {
- mBuff = ctx.allocateFrame();
- memBuffs[buffs[counter]] = mBuff;
+ if (memBuffs[buffs[counter]] == null) {
+ memBuffs[buffs[counter]] = new VSizeFrame(ctx);
}
- FrameUtils.copy(reloadBuffer, mBuff);
+ memBuffs[buffs[counter]].ensureFrameSize(reloadBuffer.getFrameSize());
+ mBuff = memBuffs[buffs[counter]].getBuffer();
+ FrameUtils.copyAndFlip(reloadBuffer.getBuffer(), mBuff);
counter++;
- reloadBuffer.clear();
+ reloadBuffer.reset();
}
int curNext = nextBuff[buffs[buffs.length - 1]];
@@ -459,10 +464,10 @@ public class OptimizedHybridHashJoin {
private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
- this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
- new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, new FrameTupleAccessor(
- ctx.getFrameSize(), buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
- comparators), isLeftOuter, nullWriters1, table, predEvaluator, isReversed);
+ this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount, new FrameTupleAccessor(probeRd), probeHpc,
+ new FrameTupleAccessor(buildRd), buildHpc,
+ new FrameTuplePairComparator(probeKeys, buildKeys, comparators), isLeftOuter, nullWriters1, table,
+ predEvaluator, isReversed);
}
private void cacheInMemJoin() throws HyracksDataException {
@@ -471,7 +476,7 @@ public class OptimizedHybridHashJoin {
if (!pStatus.get(pid)) {
int nextBuffIx = curPBuff[pid];
while (nextBuffIx > -1) { //It is not Invalid or End_Of_Partition
- inMemJoiner.build(memBuffs[nextBuffIx]);
+ inMemJoiner.build(memBuffs[nextBuffIx].getBuffer());
nextBuffIx = nextBuff[nextBuffIx];
}
}
@@ -480,9 +485,9 @@ public class OptimizedHybridHashJoin {
public void initProbe() throws HyracksDataException {
- sPartBuffs = new ByteBuffer[numOfSpilledParts];
+ sPartBuffs = new IFrame[numOfSpilledParts];
for (int i = 0; i < numOfSpilledParts; i++) {
- sPartBuffs[i] = ctx.allocateFrame();
+ sPartBuffs[i] = new VSizeFrame(ctx);
}
curPBuff = new int[numOfPartitions];
int nextBuffIxToAlloc = 0;
@@ -495,12 +500,12 @@ public class OptimizedHybridHashJoin {
probePSizeInTups = new int[numOfPartitions];
probeRFWriters = new RunFileWriter[numOfPartitions];
- probeResBuff = ctx.allocateFrame();
+ probeResBuff = new VSizeFrame(ctx);
- probeTupAppenderToResident = new FrameTupleAppender(ctx.getFrameSize());
+ probeTupAppenderToResident = new FrameTupleAppender();
probeTupAppenderToResident.reset(probeResBuff, true);
- probeTupAppenderToSpilled = new FrameTupleAppender(ctx.getFrameSize());
+ probeTupAppenderToSpilled = new FrameTupleAppender();
}
@@ -517,21 +522,20 @@ public class OptimizedHybridHashJoin {
inMemJoiner.join(buffer, writer);
return;
}
- ByteBuffer buff = null;
for (int i = 0; i < tupleCount; ++i) {
int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
- if (buildPSizeInTups[pid] > 0) { //Tuple has potential match from previous phase
+ if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase
if (pStatus.get(pid)) { //pid is Spilled
boolean needToClear = false;
- buff = sPartBuffs[curPBuff[pid]];
+ IFrame frame = sPartBuffs[curPBuff[pid]];
while (true) {
- probeTupAppenderToSpilled.reset(buff, needToClear);
+ probeTupAppenderToSpilled.reset(frame, needToClear);
if (probeTupAppenderToSpilled.append(accessorProbe, i)) {
break;
}
- probeWrite(pid, buff);
- buff.clear();
+ probeWrite(pid, frame.getBuffer());
+ frame.reset();
needToClear = true;
}
} else { //pid is Resident
@@ -539,7 +543,7 @@ public class OptimizedHybridHashJoin {
if (probeTupAppenderToResident.append(accessorProbe, i)) {
break;
}
- inMemJoiner.join(probeResBuff, writer);
+ inMemJoiner.join(probeResBuff.getBuffer(), writer);
probeTupAppenderToResident.reset(probeResBuff, true);
}
@@ -551,13 +555,13 @@ public class OptimizedHybridHashJoin {
}
- public void closeProbe(IFrameWriter writer) throws HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
- inMemJoiner.join(probeResBuff, writer);
+ public void closeProbe(IFrameWriter writer) throws
+ HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+ inMemJoiner.join(probeResBuff.getBuffer(), writer);
inMemJoiner.closeJoin(writer);
- ByteBuffer buff = null;
for (int pid = pStatus.nextSetBit(0); pid >= 0; pid = pStatus.nextSetBit(pid + 1)) {
- buff = sPartBuffs[curPBuff[pid]];
+ ByteBuffer buff = sPartBuffs[curPBuff[pid]].getBuffer();
accessorProbe.reset(buff);
if (accessorProbe.getTupleCount() > 0) {
probeWrite(pid, buff);
@@ -651,10 +655,10 @@ public class OptimizedHybridHashJoin {
double avgBuildSpSz = sumOfBuildSpilledSizes / numOfSpilledPartitions;
double avgProbeSpSz = sumOfProbeSpilledSizes / numOfSpilledPartitions;
- String s = "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t"
- + numOfSpilledPartitions + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t"
- + avgProbeSpSz + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t"
- + freeFramesCounter;
+ String s =
+ "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t" + numOfSpilledPartitions
+ + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t" + avgProbeSpSz
+ + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t" + freeFramesCounter;
return s;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 0494288..840eb75 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -368,7 +370,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private BuildAndPartitionTaskState state;
- private ByteBuffer rPartbuff = ctx.allocateFrame();
+ private IFrame rPartbuff = new VSizeFrame(ctx);
@Override
public void open() throws HyracksDataException {
@@ -397,13 +399,14 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
- rPartbuff.clear();
+ rPartbuff.reset();
for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
- if (bReader == null || pReader == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
+ if (bReader == null || pReader
+ == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
continue;
}
int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
@@ -423,10 +426,14 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
- long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize()) : (ohhj
- .getBuildPartitionSize(pid) / ctx.getFrameSize());
- long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj
- .getProbePartitionSize(pid) / ctx.getFrameSize());
+ long buildPartSize = wasReversed ?
+ (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize()) :
+ (ohhj
+ .getBuildPartitionSize(pid) / ctx.getInitialFrameSize());
+ long probePartSize = wasReversed ?
+ (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize()) :
+ (ohhj
+ .getProbePartitionSize(pid) / ctx.getInitialFrameSize());
LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid "
+ pid + ") - (level " + level + ") - wasReversed " + wasReversed + " - BuildSize:\t"
@@ -437,7 +444,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)
|| (probePartSize < state.memForJoin && !isLeftOuter)) {
int tabSize = -1;
- if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
+ if (!forceRR && (isLeftOuter || (buildPartSize
+ < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "
+ level + "]");
tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid) : ohhj
@@ -450,8 +458,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, probeHpc, buildHpc,
buildSideReader, probeSideReader, false, pid); //checked-confirmed
} else { //Case 1.2 - InMemHJ with Role Reversal
- LOGGER.fine("\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
- + level + "]");
+ LOGGER.fine(
+ "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
+ + level + "]");
tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid) : ohhj
.getProbePartitionSizeInTup(pid);
if (tabSize == 0) {
@@ -467,7 +476,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
else {
LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
OptimizedHybridHashJoin rHHj;
- if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+ if (!forceRR && (isLeftOuter
+ || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ level + "]");
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
@@ -478,18 +488,18 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
buildSideReader.open();
rHHj.initBuild();
- rPartbuff.clear();
+ rPartbuff.reset();
while (buildSideReader.nextFrame(rPartbuff)) {
- rHHj.build(rPartbuff);
+ rHHj.build(rPartbuff.getBuffer());
}
rHHj.closeBuild();
probeSideReader.open();
rHHj.initProbe();
- rPartbuff.clear();
+ rPartbuff.reset();
while (probeSideReader.nextFrame(rPartbuff)) {
- rHHj.probe(rPartbuff, writer);
+ rHHj.probe(rPartbuff.getBuffer(), writer);
}
rHHj.closeProbe(writer);
@@ -499,10 +509,13 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitionStatus();
- if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
- LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
- + level + "]");
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
+ * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+ LOGGER.fine(
+ "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ + level + "]");
+ for (int rPid = rPStatus.nextSetBit(0);
+ rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -510,13 +523,16 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
continue;
}
- joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); //checked-confirmed
+ joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1),
+ false); //checked-confirmed
}
} else { //Case 2.1.2 - Switch to NLJ
- LOGGER.fine("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
- + level + "]");
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ LOGGER.fine(
+ "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ + level + "]");
+ for (int rPid = rPStatus.nextSetBit(0);
+ rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -547,16 +563,16 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
probeSideReader.open();
rHHj.initBuild();
- rPartbuff.clear();
+ rPartbuff.reset();
while (probeSideReader.nextFrame(rPartbuff)) {
- rHHj.build(rPartbuff);
+ rHHj.build(rPartbuff.getBuffer());
}
rHHj.closeBuild();
rHHj.initProbe();
buildSideReader.open();
- rPartbuff.clear();
+ rPartbuff.reset();
while (buildSideReader.nextFrame(rPartbuff)) {
- rHHj.probe(rPartbuff, writer);
+ rHHj.probe(rPartbuff.getBuffer(), writer);
}
rHHj.closeProbe(writer);
int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
@@ -565,10 +581,12 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitionStatus();
- if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
+ * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "
+ level + "]");
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ for (int rPid = rPStatus.nextSetBit(0);
+ rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -576,12 +594,15 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
continue;
}
- joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
+ joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1),
+ true); //checked-confirmed
}
} else { //Case 2.2.2 - Switch to NLJ
- LOGGER.fine("\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "
- + level + "]");
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ LOGGER.fine(
+ "\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "
+ + level + "]");
+ for (int rPid = rPStatus.nextSetBit(0);
+ rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -611,27 +632,27 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader,
boolean reverse, int pid) throws HyracksDataException {
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
- InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
- ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
- buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
- isLeftOuter, nullWriters1, table, predEvaluator, reverse);
+ InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(probeRDesc),
+ hpcRepLarger, new FrameTupleAccessor(buildRDesc), hpcRepSmaller,
+ new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nullWriters1, table,
+ predEvaluator, reverse);
bReader.open();
- rPartbuff.clear();
+ rPartbuff.reset();
while (bReader.nextFrame(rPartbuff)) {
- ByteBuffer copyBuffer = ctx.allocateFrame(); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
- FrameUtils.copy(rPartbuff, copyBuffer);
- FrameUtils.makeReadable(copyBuffer);
+ ByteBuffer copyBuffer = ctx
+ .allocateFrame(rPartbuff.getFrameSize()); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
+ FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
joiner.build(copyBuffer);
- rPartbuff.clear();
+ rPartbuff.reset();
}
bReader.close();
- rPartbuff.clear();
+ rPartbuff.reset();
// probe
pReader.open();
while (pReader.nextFrame(rPartbuff)) {
- joiner.join(rPartbuff, writer);
- rPartbuff.clear();
+ joiner.join(rPartbuff.getBuffer(), writer);
+ rPartbuff.reset();
}
pReader.close();
joiner.closeJoin(writer);
@@ -640,27 +661,26 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator,
boolean reverse) throws HyracksDataException {
- NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
- new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize,
+ NestedLoopJoin nlj = new NestedLoopJoin(ctx,
+ new FrameTupleAccessor(outerRd),
+ new FrameTupleAccessor(innerRd), nljComparator, memorySize,
predEvaluator, isLeftOuter, nullWriters1);
nlj.setIsReversed(reverse);
- ByteBuffer cacheBuff = ctx.allocateFrame();
+ IFrame cacheBuff = new VSizeFrame(ctx);
innerReader.open();
while (innerReader.nextFrame(cacheBuff)) {
- FrameUtils.makeReadable(cacheBuff);
- nlj.cache(cacheBuff);
- cacheBuff.clear();
+ nlj.cache(cacheBuff.getBuffer());
+ cacheBuff.reset();
}
nlj.closeCache();
- ByteBuffer joinBuff = ctx.allocateFrame();
+ IFrame joinBuff = new VSizeFrame(ctx);
outerReader.open();
while (outerReader.nextFrame(joinBuff)) {
- FrameUtils.makeReadable(joinBuff);
- nlj.join(joinBuff, writer);
- joinBuff.clear();
+ nlj.join(joinBuff.getBuffer(), writer);
+ joinBuff.reset();
}
nlj.closeJoin(writer);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 12f319f..ffe3abd 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -15,12 +15,10 @@
package edu.uci.ics.hyracks.dataflow.std.misc;
-import java.nio.ByteBuffer;
-
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class ConstantTupleSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
@@ -41,14 +39,12 @@ public class ConstantTupleSourceOperatorNodePushable extends AbstractUnaryOutput
@Override
public void initialize() throws HyracksDataException {
- ByteBuffer writeBuffer = ctx.allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(writeBuffer, true);
+ FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
if (fieldSlots != null && tupleData != null && tupleSize > 0)
appender.append(fieldSlots, tupleData, 0, tupleSize);
writer.open();
try {
- FrameUtils.flushFrame(writeBuffer, writer);
+ appender.flush(writer, false);
} catch (Exception e) {
writer.fail();
throw new HyracksDataException(e);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
index 1fff4fe..4356181 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
@@ -16,6 +16,7 @@ package edu.uci.ics.hyracks.dataflow.std.misc;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -50,7 +51,7 @@ public class LimitOperatorDescriptor extends AbstractSingleActivityOperatorDescr
@Override
public void open() throws HyracksDataException {
- fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptors[0]);
+ fta = new FrameTupleAccessor(recordDescriptors[0]);
currentSize = 0;
finished = false;
writer.open();
@@ -62,16 +63,13 @@ public class LimitOperatorDescriptor extends AbstractSingleActivityOperatorDescr
fta.reset(buffer);
int count = fta.getTupleCount();
if ((currentSize + count) > outputLimit) {
- ByteBuffer b = ctx.allocateFrame();
- FrameTupleAppender partialAppender = new FrameTupleAppender(ctx.getFrameSize());
- partialAppender.reset(b, true);
+ FrameTupleAppender partialAppender = new FrameTupleAppender(new VSizeFrame(ctx));
int copyCount = outputLimit - currentSize;
for (int i = 0; i < copyCount; i++) {
- partialAppender.append(fta, i);
+ FrameUtils.appendToWriter(writer, partialAppender, fta, i);
currentSize++;
}
- FrameUtils.makeReadable(b);
- FrameUtils.flushFrame(b, writer);
+ partialAppender.flush(writer,false);
finished = true;
} else {
FrameUtils.flushFrame(buffer, writer);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 48de837..9339b34 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -19,6 +19,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
@@ -27,6 +28,7 @@ import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
public class MaterializerTaskState extends AbstractStateObject {
@@ -61,15 +63,13 @@ public class MaterializerTaskState extends AbstractStateObject {
out.nextFrame(buffer);
}
- public void writeOut(IFrameWriter writer, ByteBuffer frame) throws HyracksDataException {
+ public void writeOut(IFrameWriter writer, IFrame frame) throws HyracksDataException {
RunFileReader in = out.createReader();
writer.open();
try {
in.open();
while (in.nextFrame(frame)) {
- frame.flip();
- writer.nextFrame(frame);
- frame.clear();
+ writer.nextFrame(frame.getBuffer());
}
in.close();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 3a405d0..36fdd50 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -16,6 +16,7 @@ package edu.uci.ics.hyracks.dataflow.std.misc;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -25,6 +26,8 @@ import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
@@ -107,8 +110,7 @@ public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor
@Override
public void close() throws HyracksDataException {
state.close();
- ByteBuffer frame = ctx.allocateFrame();
- state.writeOut(writer, frame);
+ state.writeOut(writer, new VSizeFrame(ctx));
}
};
@@ -166,10 +168,9 @@ public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- ByteBuffer frame = ctx.allocateFrame();
MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
- state.writeOut(writer, frame);
+ state.writeOut(writer, new VSizeFrame(ctx));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index b8e1ac8..1f6d965 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -17,6 +17,7 @@ package edu.uci.ics.hyracks.dataflow.std.misc;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -172,10 +173,9 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
@Override
public void initialize() throws HyracksDataException {
- ByteBuffer frame = ctx.allocateFrame();
MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
- state.writeOut(writer, frame);
+ state.writeOut(writer, new VSizeFrame(ctx));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 42ed59e..3a6b5d2 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -18,7 +18,9 @@ import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.IFrame;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -61,17 +63,17 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat
throws HyracksDataException {
final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
- final ByteBuffer outputBuffer = ctx.allocateFrame();
+ final IFrame frame = new VSizeFrame(ctx);
- final FrameOutputStream frameOutputStream = new FrameOutputStream(ctx.getFrameSize());
- frameOutputStream.reset(outputBuffer, true);
+ final FrameOutputStream frameOutputStream = new FrameOutputStream(ctx.getInitialFrameSize());
+ frameOutputStream.reset(frame, true);
PrintStream printStream = new PrintStream(frameOutputStream);
final RecordDescriptor outRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IResultSerializer resultSerializer = resultSerializerFactory.createResultSerializer(outRecordDesc,
printStream);
- final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDesc);
+ final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(outRecordDesc);
return new AbstractUnaryInputSinkOperatorNodePushable() {
IFrameWriter datasetPartitionWriter;
@@ -94,12 +96,8 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat
for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
resultSerializer.appendTuple(frameTupleAccessor, tIndex);
if (!frameOutputStream.appendTuple()) {
- datasetPartitionWriter.nextFrame(outputBuffer);
- frameOutputStream.reset(outputBuffer, true);
+ frameOutputStream.flush(datasetPartitionWriter);
- /* TODO(madhusudancs): This works under the assumption that no single serialized record is
- * longer than the buffer size.
- */
resultSerializer.appendTuple(frameTupleAccessor, tIndex);
frameOutputStream.appendTuple();
}
@@ -114,8 +112,7 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat
@Override
public void close() throws HyracksDataException {
if (frameOutputStream.getTupleCount() > 0) {
- datasetPartitionWriter.nextFrame(outputBuffer);
- frameOutputStream.reset(outputBuffer, true);
+ frameOutputStream.flush(datasetPartitionWriter);
}
datasetPartitionWriter.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractFrameSorter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractFrameSorter.java
new file mode 100644
index 0000000..2a1a403
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
+
+public abstract class AbstractFrameSorter implements IFrameSorter {
+
+ protected Logger LOGGER = Logger.getLogger(AbstractFrameSorter.class.getName());
+ static final int PTR_SIZE = 4;
+ static final int ID_FRAMEID = 0;
+ static final int ID_TUPLE_START = 1;
+ static final int ID_TUPLE_END = 2;
+ static final int ID_NORMAL_KEY = 3;
+
+ protected final int[] sortFields;
+ protected final IBinaryComparator[] comparators;
+ protected final INormalizedKeyComputer nkc;
+ protected final IFrameBufferManager bufferManager;
+ protected final FrameTupleAccessor inputTupleAccessor;
+ protected final IFrameTupleAppender outputAppender;
+ protected final IFrame outputFrame;
+ protected final int outputLimit;
+
+ protected int[] tPointers;
+ protected int tupleCount;
+
+ public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) throws HyracksDataException {
+ this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+ Integer.MAX_VALUE);
+ }
+
+ public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, int outputLimit)
+ throws HyracksDataException {
+ this.bufferManager = bufferManager;
+ this.sortFields = sortFields;
+ this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ this.comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ this.inputTupleAccessor = new FrameTupleAccessor(recordDescriptor);
+ this.outputAppender = new FrameTupleAppender();
+ this.outputFrame = new VSizeFrame(ctx);
+ this.outputLimit = outputLimit;
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ this.tupleCount = 0;
+ this.bufferManager.reset();
+ }
+
+ @Override
+ public boolean insertFrame(ByteBuffer inputBuffer) throws HyracksDataException {
+ if (bufferManager.insertFrame(inputBuffer) >= 0) {
+ return true;
+ }
+ if (getFrameCount() == 0) {
+ throw new HyracksDataException(
+ "The input frame is too big for the sorting buffer, please allocate bigger buffer size");
+ }
+ return false;
+ }
+
+ @Override
+ public void sort() throws HyracksDataException {
+ tupleCount = 0;
+ for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
+ inputTupleAccessor
+ .reset(bufferManager.getFrame(i), bufferManager.getFrameStartOffset(i),
+ bufferManager.getFrameSize(i));
+ tupleCount += inputTupleAccessor.getTupleCount();
+ }
+ if (tPointers == null || tPointers.length < tupleCount * PTR_SIZE) {
+ tPointers = new int[tupleCount * PTR_SIZE];
+ }
+ int ptr = 0;
+ int sfIdx = sortFields[0];
+ for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
+ inputTupleAccessor
+ .reset(bufferManager.getFrame(i), bufferManager.getFrameStartOffset(i),
+ bufferManager.getFrameSize(i));
+ int tCount = inputTupleAccessor.getTupleCount();
+ byte[] array = inputTupleAccessor.getBuffer().array();
+ for (int j = 0; j < tCount; ++j) {
+ int tStart = inputTupleAccessor.getTupleStartOffset(j);
+ int tEnd = inputTupleAccessor.getTupleEndOffset(j);
+ tPointers[ptr * PTR_SIZE + ID_FRAMEID] = i;
+ tPointers[ptr * PTR_SIZE + ID_TUPLE_START] = tStart;
+ tPointers[ptr * PTR_SIZE + ID_TUPLE_END] = tEnd;
+ int f0StartRel = inputTupleAccessor.getFieldStartOffset(j, sfIdx);
+ int f0EndRel = inputTupleAccessor.getFieldEndOffset(j, sfIdx);
+ int f0Start = f0StartRel + tStart + inputTupleAccessor.getFieldSlotsLength();
+ tPointers[ptr * PTR_SIZE + ID_NORMAL_KEY] =
+ nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
+ ++ptr;
+ }
+ }
+ if (tupleCount > 0) {
+ sortTupleReferences();
+ }
+ }
+
+ abstract void sortTupleReferences() throws HyracksDataException;
+
+ @Override
+ public int getFrameCount() {
+ return bufferManager.getNumFrames();
+ }
+
+ @Override
+ public boolean hasRemaining() {
+ return getFrameCount() > 0;
+ }
+
+ @Override
+ public int flush(IFrameWriter writer) throws HyracksDataException {
+ outputAppender.reset(outputFrame, true);
+ int maxFrameSize = outputFrame.getFrameSize();
+ int limit = Math.min(tupleCount, outputLimit);
+ int io = 0;
+ for (int ptr = 0; ptr < limit; ++ptr) {
+ int i = tPointers[ptr * PTR_SIZE + ID_FRAMEID];
+ int tStart = tPointers[ptr * PTR_SIZE + ID_TUPLE_START];
+ int tEnd = tPointers[ptr * PTR_SIZE + ID_TUPLE_END];
+ ByteBuffer buffer = bufferManager.getFrame(i);
+ inputTupleAccessor.reset(buffer, bufferManager.getFrameStartOffset(i), bufferManager.getFrameSize(i));
+
+ int flushed = FrameUtils.appendToWriter(writer, outputAppender, inputTupleAccessor, tStart, tEnd);
+ if (flushed > 0) {
+ maxFrameSize = Math.max(maxFrameSize, flushed);
+ io++;
+ }
+ }
+ maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
+ outputAppender.flush(writer, true);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine(
+ "Flushed records:" + limit + " out of " + tupleCount + "; Flushed through " + (io + 1) + " frames");
+ }
+ return maxFrameSize;
+ }
+
+ @Override
+ public void close() {
+ tupleCount = 0;
+ bufferManager.close();
+ tPointers = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
new file mode 100644
index 0000000..1dd35a8
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public abstract class AbstractSortRunGenerator implements IRunGenerator {
+ protected final List<RunAndMaxFrameSizePair> runAndMaxSizes;
+
+ public AbstractSortRunGenerator() {
+ runAndMaxSizes = new LinkedList<>();
+ }
+
+ abstract public ISorter getSorter() throws HyracksDataException;
+
+ @Override
+ public void open() throws HyracksDataException {
+ runAndMaxSizes.clear();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (getSorter().hasRemaining()) {
+ if (runAndMaxSizes.size() <= 0) {
+ getSorter().sort();
+ } else {
+ flushFramesToRun();
+ }
+ }
+ }
+
+ abstract protected RunFileWriter getRunFileWriter() throws HyracksDataException;
+
+ abstract protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException;
+
+ protected void flushFramesToRun() throws HyracksDataException {
+ getSorter().sort();
+ RunFileWriter runWriter = getRunFileWriter();
+ IFrameWriter flushWriter = getFlushableFrameWriter(runWriter);
+ flushWriter.open();
+ int maxFlushedFrameSize;
+ try {
+ maxFlushedFrameSize = getSorter().flush(flushWriter);
+ } finally {
+ flushWriter.close();
+ }
+ runAndMaxSizes.add(new RunAndMaxFrameSizePair(runWriter.createReader(), maxFlushedFrameSize));
+ getSorter().reset();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
+ public List<RunAndMaxFrameSizePair> getRuns() {
+ return runAndMaxSizes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
new file mode 100644
index 0000000..0c1c622
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public abstract class AbstractSorterOperatorDescriptor extends AbstractOperatorDescriptor {
+
+ private static final Logger LOGGER = Logger.getLogger(AbstractSorterOperatorDescriptor.class.getName());
+
+ private static final long serialVersionUID = 1L;
+
+ protected static final int SORT_ACTIVITY_ID = 0;
+ protected static final int MERGE_ACTIVITY_ID = 1;
+
+ protected final int[] sortFields;
+ protected final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+ protected final IBinaryComparatorFactory[] comparatorFactories;
+ protected final int framesLimit;
+
+ public AbstractSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) {
+ super(spec, 1, 1);
+ this.framesLimit = framesLimit;
+ this.sortFields = sortFields;
+ this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+ this.comparatorFactories = comparatorFactories;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ public abstract SortActivity getSortActivity(ActivityId id);
+
+ public abstract MergeActivity getMergeActivity(ActivityId id);
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ SortActivity sa = getSortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
+ MergeActivity ma = getMergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
+
+ builder.addActivity(this, sa);
+ builder.addSourceEdge(0, sa, 0);
+
+ builder.addActivity(this, ma);
+ builder.addTargetEdge(0, ma, 0);
+
+ builder.addBlockingEdge(sa, ma);
+ }
+
+ public static class SortTaskState extends AbstractStateObject {
+ public List<RunAndMaxFrameSizePair> runAndMaxFrameSizePairs;
+ public ISorter sorter;
+
+ public SortTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
+ protected abstract class SortActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public SortActivity(ActivityId id) {
+ super(id);
+ }
+
+ protected abstract AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider) throws HyracksDataException;
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private AbstractSortRunGenerator runGen;
+
+ @Override
+ public void open() throws HyracksDataException {
+ runGen = getRunGenerator(ctx, recordDescProvider);
+ runGen.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ runGen.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ SortTaskState state = new SortTaskState(ctx.getJobletContext().getJobId(),
+ new TaskId(getActivityId(), partition));
+ runGen.close();
+ state.runAndMaxFrameSizePairs = runGen.getRuns();
+ state.sorter = runGen.getSorter();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("InitialNumberOfRuns:" + runGen.getRuns().size());
+ }
+ ctx.setStateObject(state);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ runGen.fail();
+ }
+ };
+ return op;
+ }
+ }
+
+ protected abstract class MergeActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public MergeActivity(ActivityId id) {
+ super(id);
+ }
+
+ protected abstract ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
+ List<RunAndMaxFrameSizePair> runs, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
+ int necessaryFrames);
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ SortTaskState state = (SortTaskState) ctx
+ .getStateObject(new TaskId(new ActivityId(getOperatorId(), SORT_ACTIVITY_ID), partition));
+ List<RunAndMaxFrameSizePair> runs = state.runAndMaxFrameSizePairs;
+ ISorter sorter = state.sorter;
+ IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ?
+ null :
+ firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ ExternalSortRunMerger merger = getSortRunMerger(ctx, recordDescProvider, writer, sorter, runs,
+ comparators, nmkComputer, framesLimit);
+ merger.process();
+ }
+ };
+ return op;
+ }
+ }
+
+}