You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2015/06/18 06:22:26 UTC

[09/14] incubator-asterixdb-hyracks git commit: VariableSizeFrame(VSizeFrame) support for Hyracks.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 506da2e..ed25b4f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -19,7 +19,9 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
@@ -74,7 +76,7 @@ public class OptimizedHybridHashJoin {
     private final boolean isLeftOuter;
     private final INullWriter[] nullWriters1;
 
-    private ByteBuffer[] memBuffs; //Memory buffers for build
+    private IFrame[] memBuffs; //Memory buffers for build
     private int[] curPBuff; //Current (last) Buffer for each partition
     private int[] nextBuff; //Next buffer in the partition's buffer chain
     private int[] buildPSizeInTups; //Size of build partitions (in tuples)
@@ -92,9 +94,9 @@ public class OptimizedHybridHashJoin {
     private FrameTupleAppender probeTupAppenderToSpilled;
 
     private int numOfSpilledParts;
-    private ByteBuffer[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
-    private ByteBuffer probeResBuff; //Buffer for probe resident partition tuples
-    private ByteBuffer reloadBuffer; //Buffer for reloading spilled partitions during partition tuning 
+    private IFrame[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
+    private IFrame probeResBuff; //Buffer for probe resident partition tuples
+    private IFrame reloadBuffer; //Buffer for reloading spilled partitions during partition tuning
 
     private int[] buildPSizeInFrames; //Used for partition tuning
     private int freeFramesCounter; //Used for partition tuning
@@ -124,8 +126,8 @@ public class OptimizedHybridHashJoin {
         this.buildRFWriters = new RunFileWriter[numOfPartitions];
         this.probeRFWriters = new RunFileWriter[numOfPartitions];
 
-        this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
-        this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+        this.accessorBuild = new FrameTupleAccessor(buildRd);
+        this.accessorProbe = new FrameTupleAccessor(probeRd);
 
         this.predEvaluator = predEval;
         this.isLeftOuter = false;
@@ -154,8 +156,8 @@ public class OptimizedHybridHashJoin {
         this.buildRFWriters = new RunFileWriter[numOfPartitions];
         this.probeRFWriters = new RunFileWriter[numOfPartitions];
 
-        this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
-        this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+        this.accessorBuild = new FrameTupleAccessor(buildRd);
+        this.accessorProbe = new FrameTupleAccessor(probeRd);
 
         this.predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
@@ -170,7 +172,7 @@ public class OptimizedHybridHashJoin {
     }
 
     public void initBuild() throws HyracksDataException {
-        memBuffs = new ByteBuffer[memForJoin];
+        memBuffs = new IFrame[memForJoin];
         curPBuff = new int[numOfPartitions];
         nextBuff = new int[memForJoin];
         pStatus = new BitSet(numOfPartitions);
@@ -179,19 +181,22 @@ public class OptimizedHybridHashJoin {
         buildPSizeInFrames = new int[numOfPartitions];
         freeFramesCounter = memForJoin - numOfPartitions;
 
-        for (int i = 0; i < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
-            memBuffs[i] = ctx.allocateFrame();
+        for (int i = 0; i
+                < numOfPartitions; i++) { //Allocating one buffer per partition and setting as the head of the chain of buffers for that partition
+            memBuffs[i] = new VSizeFrame(ctx);
             curPBuff[i] = i;
             nextBuff[i] = -1;
             buildPSizeInFrames[i] = 1; //The dedicated initial buffer
         }
 
-        nextFreeBuffIx = ((numOfPartitions < memForJoin) ? numOfPartitions : NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
+        nextFreeBuffIx = ((numOfPartitions < memForJoin) ?
+                numOfPartitions :
+                NO_MORE_FREE_BUFFER); //Setting the chain of unallocated frames
         for (int i = numOfPartitions; i < memBuffs.length; i++) {
             nextBuff[i] = UNALLOCATED_FRAME;
         }
 
-        buildTupAppender = new FrameTupleAppender(ctx.getFrameSize());
+        buildTupAppender = new FrameTupleAppender();
 
     }
 
@@ -213,7 +218,7 @@ public class OptimizedHybridHashJoin {
     }
 
     private void processTuple(int tid, int pid) throws HyracksDataException {
-        ByteBuffer partition = memBuffs[curPBuff[pid]]; //Getting current buffer for the target partition
+        IFrame partition = memBuffs[curPBuff[pid]]; //Getting current buffer for the target partition
 
         if (!pStatus.get(pid)) { //resident partition
             buildTupAppender.reset(partition, false);
@@ -226,7 +231,8 @@ public class OptimizedHybridHashJoin {
                 if (newBuffIx == NO_MORE_FREE_BUFFER) { //Spill one partition
                     int pidToSpill = selectPartitionToSpill();
                     if (pidToSpill == -1) { //No more partition to spill
-                        throw new HyracksDataException("not enough memory for Hash Join (Allocation exceeds the limit)");
+                        throw new HyracksDataException(
+                                "not enough memory for Hash Join (Allocation exceeds the limit)");
                     }
                     spillPartition(pidToSpill);
                     buildTupAppender.reset(memBuffs[pidToSpill], true);
@@ -249,8 +255,8 @@ public class OptimizedHybridHashJoin {
                     break;
                 }
                 //Dedicated in-memory buffer for the partition is full, needed to be flushed first 
-                buildWrite(pid, partition);
-                partition.clear();
+                buildWrite(pid, partition.getBuffer());
+                partition.reset();
                 needClear = true;
                 buildPSizeInFrames[pid]++;
             }
@@ -260,7 +266,7 @@ public class OptimizedHybridHashJoin {
     private int allocateFreeBuffer(int pid) throws HyracksDataException {
         if (nextFreeBuffIx != NO_MORE_FREE_BUFFER) {
             if (memBuffs[nextFreeBuffIx] == null) {
-                memBuffs[nextFreeBuffIx] = ctx.allocateFrame();
+                memBuffs[nextFreeBuffIx] = new VSizeFrame(ctx);
             }
             int curPartBuffIx = curPBuff[pid];
             curPBuff[pid] = nextFreeBuffIx;
@@ -274,7 +280,7 @@ public class OptimizedHybridHashJoin {
             } else {
                 nextFreeBuffIx = oldNext;
             }
-            (memBuffs[curPBuff[pid]]).clear();
+            memBuffs[curPBuff[pid]].reset();
 
             freeFramesCounter--;
             return (curPBuff[pid]);
@@ -300,11 +306,10 @@ public class OptimizedHybridHashJoin {
                 + " frames for Thread ID " + Thread.currentThread().getId() + " (free frames: " + freeFramesCounter
                 + ").");
         int curBuffIx = curPBuff[pid];
-        ByteBuffer buff = null;
         while (curBuffIx != END_OF_PARTITION) {
-            buff = memBuffs[curBuffIx];
-            buildWrite(pid, buff);
-            buff.clear();
+            IFrame frame = memBuffs[curBuffIx];
+            buildWrite(pid, frame.getBuffer());
+            frame.reset();
 
             int freedBuffIx = curBuffIx;
             curBuffIx = nextBuff[curBuffIx];
@@ -346,8 +351,9 @@ public class OptimizedHybridHashJoin {
         }
 
         ByteBuffer buff = null;
-        for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus.nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
-            buff = memBuffs[i];
+        for (int i = pStatus.nextSetBit(0); i >= 0; i = pStatus
+                .nextSetBit(i + 1)) { //flushing and DeAllocating the dedicated buffers for the spilled partitions
+            buff = memBuffs[i].getBuffer();
             accessorBuild.reset(buff);
             if (accessorBuild.getTupleCount() > 0) {
                 buildWrite(i, buff);
@@ -389,7 +395,7 @@ public class OptimizedHybridHashJoin {
     }
 
     private void partitionTune() throws HyracksDataException {
-        reloadBuffer = ctx.allocateFrame();
+        reloadBuffer = new VSizeFrame(ctx);
         ArrayList<Integer> reloadSet = selectPartitionsToReload();
         for (int i = 0; i < reloadSet.size(); i++) {
             int pid = reloadSet.get(i);
@@ -414,7 +420,6 @@ public class OptimizedHybridHashJoin {
             loadPartitionInMem(pid, buildRFWriters[pid], buffsToLoad);
         }
         reloadSet.clear();
-        reloadSet = null;
     }
 
     private void loadPartitionInMem(int pid, RunFileWriter wr, int[] buffs) throws HyracksDataException {
@@ -422,16 +427,16 @@ public class OptimizedHybridHashJoin {
         r.open();
         int counter = 0;
         ByteBuffer mBuff = null;
-        reloadBuffer.clear();
+        reloadBuffer.reset();
         while (r.nextFrame(reloadBuffer)) {
-            mBuff = memBuffs[buffs[counter]];
-            if (mBuff == null) {
-                mBuff = ctx.allocateFrame();
-                memBuffs[buffs[counter]] = mBuff;
+            if (memBuffs[buffs[counter]] == null) {
+                memBuffs[buffs[counter]] = new VSizeFrame(ctx);
             }
-            FrameUtils.copy(reloadBuffer, mBuff);
+            memBuffs[buffs[counter]].ensureFrameSize(reloadBuffer.getFrameSize());
+            mBuff = memBuffs[buffs[counter]].getBuffer();
+            FrameUtils.copyAndFlip(reloadBuffer.getBuffer(), mBuff);
             counter++;
-            reloadBuffer.clear();
+            reloadBuffer.reset();
         }
 
         int curNext = nextBuff[buffs[buffs.length - 1]];
@@ -459,10 +464,10 @@ public class OptimizedHybridHashJoin {
 
     private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
         ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
-        this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
-                new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, new FrameTupleAccessor(
-                        ctx.getFrameSize(), buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
-                        comparators), isLeftOuter, nullWriters1, table, predEvaluator, isReversed);
+        this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount, new FrameTupleAccessor(probeRd), probeHpc,
+                new FrameTupleAccessor(buildRd), buildHpc,
+                new FrameTuplePairComparator(probeKeys, buildKeys, comparators), isLeftOuter, nullWriters1, table,
+                predEvaluator, isReversed);
     }
 
     private void cacheInMemJoin() throws HyracksDataException {
@@ -471,7 +476,7 @@ public class OptimizedHybridHashJoin {
             if (!pStatus.get(pid)) {
                 int nextBuffIx = curPBuff[pid];
                 while (nextBuffIx > -1) { //It is not Invalid or End_Of_Partition
-                    inMemJoiner.build(memBuffs[nextBuffIx]);
+                    inMemJoiner.build(memBuffs[nextBuffIx].getBuffer());
                     nextBuffIx = nextBuff[nextBuffIx];
                 }
             }
@@ -480,9 +485,9 @@ public class OptimizedHybridHashJoin {
 
     public void initProbe() throws HyracksDataException {
 
-        sPartBuffs = new ByteBuffer[numOfSpilledParts];
+        sPartBuffs = new IFrame[numOfSpilledParts];
         for (int i = 0; i < numOfSpilledParts; i++) {
-            sPartBuffs[i] = ctx.allocateFrame();
+            sPartBuffs[i] = new VSizeFrame(ctx);
         }
         curPBuff = new int[numOfPartitions];
         int nextBuffIxToAlloc = 0;
@@ -495,12 +500,12 @@ public class OptimizedHybridHashJoin {
         probePSizeInTups = new int[numOfPartitions];
         probeRFWriters = new RunFileWriter[numOfPartitions];
 
-        probeResBuff = ctx.allocateFrame();
+        probeResBuff = new VSizeFrame(ctx);
 
-        probeTupAppenderToResident = new FrameTupleAppender(ctx.getFrameSize());
+        probeTupAppenderToResident = new FrameTupleAppender();
         probeTupAppenderToResident.reset(probeResBuff, true);
 
-        probeTupAppenderToSpilled = new FrameTupleAppender(ctx.getFrameSize());
+        probeTupAppenderToSpilled = new FrameTupleAppender();
 
     }
 
@@ -517,21 +522,20 @@ public class OptimizedHybridHashJoin {
             inMemJoiner.join(buffer, writer);
             return;
         }
-        ByteBuffer buff = null;
         for (int i = 0; i < tupleCount; ++i) {
             int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
 
-            if (buildPSizeInTups[pid] > 0) { //Tuple has potential match from previous phase
+            if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase
                 if (pStatus.get(pid)) { //pid is Spilled
                     boolean needToClear = false;
-                    buff = sPartBuffs[curPBuff[pid]];
+                    IFrame frame = sPartBuffs[curPBuff[pid]];
                     while (true) {
-                        probeTupAppenderToSpilled.reset(buff, needToClear);
+                        probeTupAppenderToSpilled.reset(frame, needToClear);
                         if (probeTupAppenderToSpilled.append(accessorProbe, i)) {
                             break;
                         }
-                        probeWrite(pid, buff);
-                        buff.clear();
+                        probeWrite(pid, frame.getBuffer());
+                        frame.reset();
                         needToClear = true;
                     }
                 } else { //pid is Resident
@@ -539,7 +543,7 @@ public class OptimizedHybridHashJoin {
                         if (probeTupAppenderToResident.append(accessorProbe, i)) {
                             break;
                         }
-                        inMemJoiner.join(probeResBuff, writer);
+                        inMemJoiner.join(probeResBuff.getBuffer(), writer);
                         probeTupAppenderToResident.reset(probeResBuff, true);
                     }
 
@@ -551,13 +555,13 @@ public class OptimizedHybridHashJoin {
 
     }
 
-    public void closeProbe(IFrameWriter writer) throws HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
-        inMemJoiner.join(probeResBuff, writer);
+    public void closeProbe(IFrameWriter writer) throws
+            HyracksDataException { //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+        inMemJoiner.join(probeResBuff.getBuffer(), writer);
         inMemJoiner.closeJoin(writer);
 
-        ByteBuffer buff = null;
         for (int pid = pStatus.nextSetBit(0); pid >= 0; pid = pStatus.nextSetBit(pid + 1)) {
-            buff = sPartBuffs[curPBuff[pid]];
+            ByteBuffer buff = sPartBuffs[curPBuff[pid]].getBuffer();
             accessorProbe.reset(buff);
             if (accessorProbe.getTupleCount() > 0) {
                 probeWrite(pid, buff);
@@ -651,10 +655,10 @@ public class OptimizedHybridHashJoin {
 
         double avgBuildSpSz = sumOfBuildSpilledSizes / numOfSpilledPartitions;
         double avgProbeSpSz = sumOfProbeSpilledSizes / numOfSpilledPartitions;
-        String s = "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t"
-                + numOfSpilledPartitions + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t"
-                + avgProbeSpSz + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t"
-                + freeFramesCounter;
+        String s =
+                "Resident Partitions:\t" + numOfResidentPartitions + "\nSpilled Partitions:\t" + numOfSpilledPartitions
+                        + "\nAvg Build Spilled Size:\t" + avgBuildSpSz + "\nAvg Probe Spilled Size:\t" + avgProbeSpSz
+                        + "\nIn-Memory Tups:\t" + numOfInMemTups + "\nNum of Free Buffers:\t" + freeFramesCounter;
         return s;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 0494288..840eb75 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -368,7 +370,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private BuildAndPartitionTaskState state;
-                private ByteBuffer rPartbuff = ctx.allocateFrame();
+                private IFrame rPartbuff = new VSizeFrame(ctx);
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -397,13 +399,14 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                     BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
 
-                    rPartbuff.clear();
+                    rPartbuff.reset();
                     for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
 
                         RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
                         RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
 
-                        if (bReader == null || pReader == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
+                        if (bReader == null || pReader
+                                == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
                             continue;
                         }
                         int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
@@ -423,10 +426,14 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
 
-                    long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize()) : (ohhj
-                            .getBuildPartitionSize(pid) / ctx.getFrameSize());
-                    long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj
-                            .getProbePartitionSize(pid) / ctx.getFrameSize());
+                    long buildPartSize = wasReversed ?
+                            (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize()) :
+                            (ohhj
+                                    .getBuildPartitionSize(pid) / ctx.getInitialFrameSize());
+                    long probePartSize = wasReversed ?
+                            (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize()) :
+                            (ohhj
+                                    .getProbePartitionSize(pid) / ctx.getInitialFrameSize());
 
                     LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid "
                             + pid + ") - (level " + level + ") - wasReversed " + wasReversed + " - BuildSize:\t"
@@ -437,7 +444,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)
                             || (probePartSize < state.memForJoin && !isLeftOuter)) {
                         int tabSize = -1;
-                        if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
+                        if (!forceRR && (isLeftOuter || (buildPartSize
+                                < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
                             LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "
                                     + level + "]");
                             tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid) : ohhj
@@ -450,8 +458,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, probeHpc, buildHpc,
                                     buildSideReader, probeSideReader, false, pid); //checked-confirmed
                         } else { //Case 1.2 - InMemHJ with Role Reversal
-                            LOGGER.fine("\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
-                                    + level + "]");
+                            LOGGER.fine(
+                                    "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
+                                            + level + "]");
                             tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid) : ohhj
                                     .getProbePartitionSizeInTup(pid);
                             if (tabSize == 0) {
@@ -467,7 +476,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     else {
                         LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
                         OptimizedHybridHashJoin rHHj;
-                        if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+                        if (!forceRR && (isLeftOuter
+                                || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
                             LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                     + level + "]");
                             int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
@@ -478,18 +488,18 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                             buildSideReader.open();
                             rHHj.initBuild();
-                            rPartbuff.clear();
+                            rPartbuff.reset();
                             while (buildSideReader.nextFrame(rPartbuff)) {
-                                rHHj.build(rPartbuff);
+                                rHHj.build(rPartbuff.getBuffer());
                             }
 
                             rHHj.closeBuild();
 
                             probeSideReader.open();
                             rHHj.initProbe();
-                            rPartbuff.clear();
+                            rPartbuff.reset();
                             while (probeSideReader.nextFrame(rPartbuff)) {
-                                rHHj.probe(rPartbuff, writer);
+                                rHHj.probe(rPartbuff.getBuffer(), writer);
                             }
                             rHHj.closeProbe(writer);
 
@@ -499,10 +509,13 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                                     : maxAfterProbeSize;
 
                             BitSet rPStatus = rHHj.getPartitionStatus();
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
-                                LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
-                                        + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
+                                    * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+                                LOGGER.fine(
+                                        "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+                                                + level + "]");
+                                for (int rPid = rPStatus.nextSetBit(0);
+                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -510,13 +523,16 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); //checked-confirmed
+                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1),
+                                            false); //checked-confirmed
                                 }
 
                             } else { //Case 2.1.2 - Switch to NLJ
-                                LOGGER.fine("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
-                                        + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                LOGGER.fine(
+                                        "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+                                                + level + "]");
+                                for (int rPid = rPStatus.nextSetBit(0);
+                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -547,16 +563,16 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                             probeSideReader.open();
                             rHHj.initBuild();
-                            rPartbuff.clear();
+                            rPartbuff.reset();
                             while (probeSideReader.nextFrame(rPartbuff)) {
-                                rHHj.build(rPartbuff);
+                                rHHj.build(rPartbuff.getBuffer());
                             }
                             rHHj.closeBuild();
                             rHHj.initProbe();
                             buildSideReader.open();
-                            rPartbuff.clear();
+                            rPartbuff.reset();
                             while (buildSideReader.nextFrame(rPartbuff)) {
-                                rHHj.probe(rPartbuff, writer);
+                                rHHj.probe(rPartbuff.getBuffer(), writer);
                             }
                             rHHj.closeProbe(writer);
                             int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
@@ -565,10 +581,12 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                                     : maxAfterProbeSize;
                             BitSet rPStatus = rHHj.getPartitionStatus();
 
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
+                                    * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
                                 LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "
                                         + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                for (int rPid = rPStatus.nextSetBit(0);
+                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -576,12 +594,15 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
+                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1),
+                                            true); //checked-confirmed
                                 }
                             } else { //Case 2.2.2 - Switch to NLJ
-                                LOGGER.fine("\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "
-                                        + level + "]");
-                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                LOGGER.fine(
+                                        "\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "
+                                                + level + "]");
+                                for (int rPid = rPStatus.nextSetBit(0);
+                                     rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
@@ -611,27 +632,27 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                         ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader,
                         boolean reverse, int pid) throws HyracksDataException {
                     ISerializableTable table = new SerializableHashTable(tabSize, ctx);
-                    InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
-                            ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
-                            buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
-                            isLeftOuter, nullWriters1, table, predEvaluator, reverse);
+                    InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(probeRDesc),
+                            hpcRepLarger, new FrameTupleAccessor(buildRDesc), hpcRepSmaller,
+                            new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nullWriters1, table,
+                            predEvaluator, reverse);
 
                     bReader.open();
-                    rPartbuff.clear();
+                    rPartbuff.reset();
                     while (bReader.nextFrame(rPartbuff)) {
-                        ByteBuffer copyBuffer = ctx.allocateFrame(); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
-                        FrameUtils.copy(rPartbuff, copyBuffer);
-                        FrameUtils.makeReadable(copyBuffer);
+                        ByteBuffer copyBuffer = ctx
+                                .allocateFrame(rPartbuff.getFrameSize()); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
+                        FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
                         joiner.build(copyBuffer);
-                        rPartbuff.clear();
+                        rPartbuff.reset();
                     }
                     bReader.close();
-                    rPartbuff.clear();
+                    rPartbuff.reset();
                     // probe
                     pReader.open();
                     while (pReader.nextFrame(rPartbuff)) {
-                        joiner.join(rPartbuff, writer);
-                        rPartbuff.clear();
+                        joiner.join(rPartbuff.getBuffer(), writer);
+                        rPartbuff.reset();
                     }
                     pReader.close();
                     joiner.closeJoin(writer);
@@ -640,27 +661,26 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
                         RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator,
                         boolean reverse) throws HyracksDataException {
-                    NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
-                            new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize,
+                    NestedLoopJoin nlj = new NestedLoopJoin(ctx,
+                            new FrameTupleAccessor(outerRd),
+                            new FrameTupleAccessor(innerRd), nljComparator, memorySize,
                             predEvaluator, isLeftOuter, nullWriters1);
                     nlj.setIsReversed(reverse);
 
-                    ByteBuffer cacheBuff = ctx.allocateFrame();
+                    IFrame cacheBuff = new VSizeFrame(ctx);
                     innerReader.open();
                     while (innerReader.nextFrame(cacheBuff)) {
-                        FrameUtils.makeReadable(cacheBuff);
-                        nlj.cache(cacheBuff);
-                        cacheBuff.clear();
+                        nlj.cache(cacheBuff.getBuffer());
+                        cacheBuff.reset();
                     }
                     nlj.closeCache();
 
-                    ByteBuffer joinBuff = ctx.allocateFrame();
+                    IFrame joinBuff = new VSizeFrame(ctx);
                     outerReader.open();
 
                     while (outerReader.nextFrame(joinBuff)) {
-                        FrameUtils.makeReadable(joinBuff);
-                        nlj.join(joinBuff, writer);
-                        joinBuff.clear();
+                        nlj.join(joinBuff.getBuffer(), writer);
+                        joinBuff.reset();
                     }
 
                     nlj.closeJoin(writer);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 12f319f..ffe3abd 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -15,12 +15,10 @@
 
 package edu.uci.ics.hyracks.dataflow.std.misc;
 
-import java.nio.ByteBuffer;
-
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class ConstantTupleSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
@@ -41,14 +39,12 @@ public class ConstantTupleSourceOperatorNodePushable extends AbstractUnaryOutput
 
     @Override
     public void initialize() throws HyracksDataException {
-        ByteBuffer writeBuffer = ctx.allocateFrame();
-        FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-        appender.reset(writeBuffer, true);
+        FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
         if (fieldSlots != null && tupleData != null && tupleSize > 0)
             appender.append(fieldSlots, tupleData, 0, tupleSize);
         writer.open();
         try {
-            FrameUtils.flushFrame(writeBuffer, writer);
+            appender.flush(writer, false);
         } catch (Exception e) {
             writer.fail();
             throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
index 1fff4fe..4356181 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
@@ -16,6 +16,7 @@ package edu.uci.ics.hyracks.dataflow.std.misc;
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -50,7 +51,7 @@ public class LimitOperatorDescriptor extends AbstractSingleActivityOperatorDescr
 
             @Override
             public void open() throws HyracksDataException {
-                fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptors[0]);
+                fta = new FrameTupleAccessor(recordDescriptors[0]);
                 currentSize = 0;
                 finished = false;
                 writer.open();
@@ -62,16 +63,13 @@ public class LimitOperatorDescriptor extends AbstractSingleActivityOperatorDescr
                     fta.reset(buffer);
                     int count = fta.getTupleCount();
                     if ((currentSize + count) > outputLimit) {
-                        ByteBuffer b = ctx.allocateFrame();
-                        FrameTupleAppender partialAppender = new FrameTupleAppender(ctx.getFrameSize());
-                        partialAppender.reset(b, true);
+                        FrameTupleAppender partialAppender = new FrameTupleAppender(new VSizeFrame(ctx));
                         int copyCount = outputLimit - currentSize;
                         for (int i = 0; i < copyCount; i++) {
-                            partialAppender.append(fta, i);
+                            FrameUtils.appendToWriter(writer, partialAppender, fta, i);
                             currentSize++;
                         }
-                        FrameUtils.makeReadable(b);
-                        FrameUtils.flushFrame(b, writer);
+                        partialAppender.flush(writer,false);
                         finished = true;
                     } else {
                         FrameUtils.flushFrame(buffer, writer);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 48de837..9339b34 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -19,6 +19,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
@@ -27,6 +28,7 @@ import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 
 public class MaterializerTaskState extends AbstractStateObject {
@@ -61,15 +63,13 @@ public class MaterializerTaskState extends AbstractStateObject {
         out.nextFrame(buffer);
     }
 
-    public void writeOut(IFrameWriter writer, ByteBuffer frame) throws HyracksDataException {
+    public void writeOut(IFrameWriter writer, IFrame frame) throws HyracksDataException {
         RunFileReader in = out.createReader();
         writer.open();
         try {
             in.open();
             while (in.nextFrame(frame)) {
-                frame.flip();
-                writer.nextFrame(frame);
-                frame.clear();
+                writer.nextFrame(frame.getBuffer());
             }
             in.close();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 3a405d0..36fdd50 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -16,6 +16,7 @@ package edu.uci.ics.hyracks.dataflow.std.misc;
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -25,6 +26,8 @@ import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
@@ -107,8 +110,7 @@ public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor
                 @Override
                 public void close() throws HyracksDataException {
                     state.close();
-                    ByteBuffer frame = ctx.allocateFrame();
-                    state.writeOut(writer, frame);
+                    state.writeOut(writer, new VSizeFrame(ctx));
                 }
 
             };
@@ -166,10 +168,9 @@ public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    ByteBuffer frame = ctx.allocateFrame();
                     MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
                             getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
-                    state.writeOut(writer, frame);
+                    state.writeOut(writer, new VSizeFrame(ctx));
                 }
 
                 @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index b8e1ac8..1f6d965 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -17,6 +17,7 @@ package edu.uci.ics.hyracks.dataflow.std.misc;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -172,10 +173,9 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
 
                 @Override
                 public void initialize() throws HyracksDataException {
-                    ByteBuffer frame = ctx.allocateFrame();
                     MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
                             getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
-                    state.writeOut(writer, frame);
+                    state.writeOut(writer, new VSizeFrame(ctx));
                 }
 
                 @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 42ed59e..3a6b5d2 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -18,7 +18,9 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrame;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -61,17 +63,17 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat
             throws HyracksDataException {
         final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
 
-        final ByteBuffer outputBuffer = ctx.allocateFrame();
+        final IFrame frame = new VSizeFrame(ctx);
 
-        final FrameOutputStream frameOutputStream = new FrameOutputStream(ctx.getFrameSize());
-        frameOutputStream.reset(outputBuffer, true);
+        final FrameOutputStream frameOutputStream = new FrameOutputStream(ctx.getInitialFrameSize());
+        frameOutputStream.reset(frame, true);
         PrintStream printStream = new PrintStream(frameOutputStream);
 
         final RecordDescriptor outRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         final IResultSerializer resultSerializer = resultSerializerFactory.createResultSerializer(outRecordDesc,
                 printStream);
 
-        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDesc);
+        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(outRecordDesc);
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
             IFrameWriter datasetPartitionWriter;
@@ -94,12 +96,8 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat
                 for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
                     resultSerializer.appendTuple(frameTupleAccessor, tIndex);
                     if (!frameOutputStream.appendTuple()) {
-                        datasetPartitionWriter.nextFrame(outputBuffer);
-                        frameOutputStream.reset(outputBuffer, true);
+                        frameOutputStream.flush(datasetPartitionWriter);
 
-                        /* TODO(madhusudancs): This works under the assumption that no single serialized record is
-                         * longer than the buffer size.
-                         */
                         resultSerializer.appendTuple(frameTupleAccessor, tIndex);
                         frameOutputStream.appendTuple();
                     }
@@ -114,8 +112,7 @@ public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperat
             @Override
             public void close() throws HyracksDataException {
                 if (frameOutputStream.getTupleCount() > 0) {
-                    datasetPartitionWriter.nextFrame(outputBuffer);
-                    frameOutputStream.reset(outputBuffer, true);
+                    frameOutputStream.flush(datasetPartitionWriter);
                 }
                 datasetPartitionWriter.close();
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractFrameSorter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractFrameSorter.java
new file mode 100644
index 0000000..2a1a403
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAppender;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
+
+public abstract class AbstractFrameSorter implements IFrameSorter {
+
+    protected Logger LOGGER = Logger.getLogger(AbstractFrameSorter.class.getName());
+    static final int PTR_SIZE = 4;
+    static final int ID_FRAMEID = 0;
+    static final int ID_TUPLE_START = 1;
+    static final int ID_TUPLE_END = 2;
+    static final int ID_NORMAL_KEY = 3;
+
+    protected final int[] sortFields;
+    protected final IBinaryComparator[] comparators;
+    protected final INormalizedKeyComputer nkc;
+    protected final IFrameBufferManager bufferManager;
+    protected final FrameTupleAccessor inputTupleAccessor;
+    protected final IFrameTupleAppender outputAppender;
+    protected final IFrame outputFrame;
+    protected final int outputLimit;
+
+    protected int[] tPointers;
+    protected int tupleCount;
+
+    public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) throws HyracksDataException {
+        this(ctx, bufferManager, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDescriptor,
+                Integer.MAX_VALUE);
+    }
+
+    public AbstractFrameSorter(IHyracksTaskContext ctx, IFrameBufferManager bufferManager, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, int outputLimit)
+            throws HyracksDataException {
+        this.bufferManager = bufferManager;
+        this.sortFields = sortFields;
+        this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+        this.comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        this.inputTupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        this.outputAppender = new FrameTupleAppender();
+        this.outputFrame = new VSizeFrame(ctx);
+        this.outputLimit = outputLimit;
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        this.tupleCount = 0;
+        this.bufferManager.reset();
+    }
+
+    @Override
+    public boolean insertFrame(ByteBuffer inputBuffer) throws HyracksDataException {
+        if (bufferManager.insertFrame(inputBuffer) >= 0) {
+            return true;
+        }
+        if (getFrameCount() == 0) {
+            throw new HyracksDataException(
+                    "The input frame is too big for the sorting buffer, please allocate bigger buffer size");
+        }
+        return false;
+    }
+
+    @Override
+    public void sort() throws HyracksDataException {
+        tupleCount = 0;
+        for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
+            inputTupleAccessor
+                    .reset(bufferManager.getFrame(i), bufferManager.getFrameStartOffset(i),
+                            bufferManager.getFrameSize(i));
+            tupleCount += inputTupleAccessor.getTupleCount();
+        }
+        if (tPointers == null || tPointers.length < tupleCount * PTR_SIZE) {
+            tPointers = new int[tupleCount * PTR_SIZE];
+        }
+        int ptr = 0;
+        int sfIdx = sortFields[0];
+        for (int i = 0; i < bufferManager.getNumFrames(); ++i) {
+            inputTupleAccessor
+                    .reset(bufferManager.getFrame(i), bufferManager.getFrameStartOffset(i),
+                            bufferManager.getFrameSize(i));
+            int tCount = inputTupleAccessor.getTupleCount();
+            byte[] array = inputTupleAccessor.getBuffer().array();
+            for (int j = 0; j < tCount; ++j) {
+                int tStart = inputTupleAccessor.getTupleStartOffset(j);
+                int tEnd = inputTupleAccessor.getTupleEndOffset(j);
+                tPointers[ptr * PTR_SIZE + ID_FRAMEID] = i;
+                tPointers[ptr * PTR_SIZE + ID_TUPLE_START] = tStart;
+                tPointers[ptr * PTR_SIZE + ID_TUPLE_END] = tEnd;
+                int f0StartRel = inputTupleAccessor.getFieldStartOffset(j, sfIdx);
+                int f0EndRel = inputTupleAccessor.getFieldEndOffset(j, sfIdx);
+                int f0Start = f0StartRel + tStart + inputTupleAccessor.getFieldSlotsLength();
+                tPointers[ptr * PTR_SIZE + ID_NORMAL_KEY] =
+                        nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
+                ++ptr;
+            }
+        }
+        if (tupleCount > 0) {
+            sortTupleReferences();
+        }
+    }
+
+    abstract void sortTupleReferences() throws HyracksDataException;
+
+    @Override
+    public int getFrameCount() {
+        return bufferManager.getNumFrames();
+    }
+
+    @Override
+    public boolean hasRemaining() {
+        return getFrameCount() > 0;
+    }
+
+    @Override
+    public int flush(IFrameWriter writer) throws HyracksDataException {
+        outputAppender.reset(outputFrame, true);
+        int maxFrameSize = outputFrame.getFrameSize();
+        int limit = Math.min(tupleCount, outputLimit);
+        int io = 0;
+        for (int ptr = 0; ptr < limit; ++ptr) {
+            int i = tPointers[ptr * PTR_SIZE + ID_FRAMEID];
+            int tStart = tPointers[ptr * PTR_SIZE + ID_TUPLE_START];
+            int tEnd = tPointers[ptr * PTR_SIZE + ID_TUPLE_END];
+            ByteBuffer buffer = bufferManager.getFrame(i);
+            inputTupleAccessor.reset(buffer, bufferManager.getFrameStartOffset(i), bufferManager.getFrameSize(i));
+
+            int flushed = FrameUtils.appendToWriter(writer, outputAppender, inputTupleAccessor, tStart, tEnd);
+            if (flushed > 0) {
+                maxFrameSize = Math.max(maxFrameSize, flushed);
+                io++;
+            }
+        }
+        maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
+        outputAppender.flush(writer, true);
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine(
+                    "Flushed records:" + limit + " out of " + tupleCount + "; Flushed through " + (io + 1) + " frames");
+        }
+        return maxFrameSize;
+    }
+
+    @Override
+    public void close() {
+        tupleCount = 0;
+        bufferManager.close();
+        tPointers = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
new file mode 100644
index 0000000..1dd35a8
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public abstract class AbstractSortRunGenerator implements IRunGenerator {
+    protected final List<RunAndMaxFrameSizePair> runAndMaxSizes;
+
+    public AbstractSortRunGenerator() {
+        runAndMaxSizes = new LinkedList<>();
+    }
+
+    abstract public ISorter getSorter() throws HyracksDataException;
+
+    @Override
+    public void open() throws HyracksDataException {
+        runAndMaxSizes.clear();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (getSorter().hasRemaining()) {
+            if (runAndMaxSizes.size() <= 0) {
+                getSorter().sort();
+            } else {
+                flushFramesToRun();
+            }
+        }
+    }
+
+    abstract protected RunFileWriter getRunFileWriter() throws HyracksDataException;
+
+    abstract protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException;
+
+    protected void flushFramesToRun() throws HyracksDataException {
+        getSorter().sort();
+        RunFileWriter runWriter = getRunFileWriter();
+        IFrameWriter flushWriter = getFlushableFrameWriter(runWriter);
+        flushWriter.open();
+        int maxFlushedFrameSize;
+        try {
+            maxFlushedFrameSize = getSorter().flush(flushWriter);
+        } finally {
+            flushWriter.close();
+        }
+        runAndMaxSizes.add(new RunAndMaxFrameSizePair(runWriter.createReader(), maxFlushedFrameSize));
+        getSorter().reset();
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+    }
+
+    @Override
+    public List<RunAndMaxFrameSizePair> getRuns() {
+        return runAndMaxSizes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0d87a57f/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
new file mode 100644
index 0000000..0c1c622
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public abstract class AbstractSorterOperatorDescriptor extends AbstractOperatorDescriptor {
+
+    private static final Logger LOGGER = Logger.getLogger(AbstractSorterOperatorDescriptor.class.getName());
+
+    private static final long serialVersionUID = 1L;
+
+    protected static final int SORT_ACTIVITY_ID = 0;
+    protected static final int MERGE_ACTIVITY_ID = 1;
+
+    protected final int[] sortFields;
+    protected final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+    protected final IBinaryComparatorFactory[] comparatorFactories;
+    protected final int framesLimit;
+
+    public AbstractSorterOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) {
+        super(spec, 1, 1);
+        this.framesLimit = framesLimit;
+        this.sortFields = sortFields;
+        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.comparatorFactories = comparatorFactories;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    public abstract SortActivity getSortActivity(ActivityId id);
+
+    public abstract MergeActivity getMergeActivity(ActivityId id);
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        SortActivity sa = getSortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
+        MergeActivity ma = getMergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
+
+        builder.addActivity(this, sa);
+        builder.addSourceEdge(0, sa, 0);
+
+        builder.addActivity(this, ma);
+        builder.addTargetEdge(0, ma, 0);
+
+        builder.addBlockingEdge(sa, ma);
+    }
+
+    public static class SortTaskState extends AbstractStateObject {
+        public List<RunAndMaxFrameSizePair> runAndMaxFrameSizePairs;
+        public ISorter sorter;
+
+        public SortTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+    }
+
+    protected abstract class SortActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public SortActivity(ActivityId id) {
+            super(id);
+        }
+
+        protected abstract AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider) throws HyracksDataException;
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                private AbstractSortRunGenerator runGen;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    runGen = getRunGenerator(ctx, recordDescProvider);
+                    runGen.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    runGen.nextFrame(buffer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    SortTaskState state = new SortTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
+                    runGen.close();
+                    state.runAndMaxFrameSizePairs = runGen.getRuns();
+                    state.sorter = runGen.getSorter();
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("InitialNumberOfRuns:" + runGen.getRuns().size());
+                    }
+                    ctx.setStateObject(state);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    runGen.fail();
+                }
+            };
+            return op;
+        }
+    }
+
+    protected abstract class MergeActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public MergeActivity(ActivityId id) {
+            super(id);
+        }
+
+        protected abstract ExternalSortRunMerger getSortRunMerger(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, IFrameWriter writer, ISorter sorter,
+                List<RunAndMaxFrameSizePair> runs, IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer,
+                int necessaryFrames);
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+                @Override
+                public void initialize() throws HyracksDataException {
+                    SortTaskState state = (SortTaskState) ctx
+                            .getStateObject(new TaskId(new ActivityId(getOperatorId(), SORT_ACTIVITY_ID), partition));
+                    List<RunAndMaxFrameSizePair> runs = state.runAndMaxFrameSizePairs;
+                    ISorter sorter = state.sorter;
+                    IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+                    for (int i = 0; i < comparatorFactories.length; ++i) {
+                        comparators[i] = comparatorFactories[i].createBinaryComparator();
+                    }
+                    INormalizedKeyComputer nmkComputer = firstKeyNormalizerFactory == null ?
+                            null :
+                            firstKeyNormalizerFactory.createNormalizedKeyComputer();
+                    ExternalSortRunMerger merger = getSortRunMerger(ctx, recordDescProvider, writer, sorter, runs,
+                            comparators, nmkComputer, framesLimit);
+                    merger.process();
+                }
+            };
+            return op;
+        }
+    }
+
+}