You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/30 00:50:57 UTC

[1/2] incubator-asterixdb-hyracks git commit: Add flush() to IFrameWriter

Repository: incubator-asterixdb-hyracks
Updated Branches:
  refs/heads/master 0b642674a -> 637e95524


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
index 24b3bf9..35ae0da 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
@@ -108,6 +108,11 @@ public abstract class AbstractDeserializedFileScanOperatorDescriptor extends Abs
         public void fail() throws HyracksDataException {
             // do nothing
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+            // do nothing
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
index 8228367..6e08238 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
@@ -70,6 +70,11 @@ public abstract class AbstractFileWriteOperatorDescriptor extends AbstractSingle
                 throw new HyracksDataException(e);
             }
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+            // This is a kind of a sink operator and hence, flush() is a no op
+        }
     }
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 81b3c4e..0960927 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -87,7 +87,7 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
                         FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
                                 tb.getSize());
                     }
-                    appender.flush(writer, true);
+                    appender.write(writer, true);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index d041d5e..e67752b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 public class DeserializedPreclusteredGroupOperator implements IOpenableDataWriterOperator {
     private final int[] groupFields;
 
+    @SuppressWarnings("rawtypes")
     private final IComparator[] comparators;
 
     private final IGroupAggregator aggregator;
@@ -42,6 +43,7 @@ public class DeserializedPreclusteredGroupOperator implements IOpenableDataWrite
 
     private IOpenableDataReader<Object[]> reader;
 
+    @SuppressWarnings("rawtypes")
     public DeserializedPreclusteredGroupOperator(int[] groupFields, IComparator[] comparators,
             IGroupAggregator aggregator) {
         this.groupFields = groupFields;
@@ -110,6 +112,7 @@ public class DeserializedPreclusteredGroupOperator implements IOpenableDataWrite
         buffer.add(data);
     }
 
+    @SuppressWarnings("unchecked")
     private int compare(Object[] d1, Object[] d2) {
         for (int i = 0; i < groupFields.length; ++i) {
             int fIdx = groupFields[i];
@@ -126,4 +129,9 @@ public class DeserializedPreclusteredGroupOperator implements IOpenableDataWrite
         // TODO Auto-generated method stub
 
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        throw new HyracksDataException("unsupported operation");
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 052fe8c..7fbde54 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -287,7 +287,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
 
                             if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                     outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                                outputAppender.flush(writer, true);
+                                outputAppender.write(writer, true);
                                 if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                         outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                                     throw new HyracksDataException(
@@ -297,7 +297,7 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
 
                         } while (true);
                     }
-                    outputAppender.flush(writer, true);
+                    outputAppender.write(writer, true);
                     aggregator.close();
                     return;
                 }
@@ -330,14 +330,14 @@ public class HashSpillableTableFactory implements ISpillableTableFactory {
 
                     if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                             outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
-                        outputAppender.flush(writer, true);
+                        outputAppender.write(writer, true);
                         if (!outputAppender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
                                 outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
                             throw new HyracksDataException("The output item is too large to be fit into a frame.");
                         }
                     }
                 }
-                outputAppender.flush(writer, true);
+                outputAppender.write(writer, true);
                 aggregator.close();
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index 779c631..aca294a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -325,14 +325,14 @@ class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOp
 
             if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
                     finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
-                writerAppender.flush(writer, true);
+                writerAppender.write(writer, true);
                 if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
                         finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
                     throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
                 }
             }
         }
-        writerAppender.flush(writer, true);
+        writerAppender.write(writer, true);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java
index 97cea25..d2342ea 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/hash/GroupingHashTable.java
@@ -240,7 +240,7 @@ class GroupingHashTable {
                 }
             }
         }
-        appender.flush(writer, true);
+        appender.write(writer, true);
     }
 
     void close() throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 7539abf..515987a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -73,4 +73,9 @@ class PreclusteredGroupOperatorNodePushable extends AbstractUnaryInputUnaryOutpu
     public void close() throws HyracksDataException {
         pgw.close();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        pgw.flush();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 1c08b53..31f52ee 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -176,7 +176,7 @@ public class PreclusteredGroupWriter implements IFrameWriter {
             if (!isFailed && !first) {
                 assert (copyFrameAccessor.getTupleCount() > 0);
                 writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
-                appenderWrapper.flush();
+                appenderWrapper.write();
             }
             aggregator.close();
             aggregateState.close();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 2a66fb7..a139341 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -147,7 +147,7 @@ public class InMemoryHashJoin {
     }
 
     public void closeJoin(IFrameWriter writer) throws HyracksDataException {
-        appender.flush(writer, true);
+        appender.write(writer, true);
         int nFrames = buffers.size();
         buffers.clear();
         ctx.deallocateFrames(nFrames);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 5ca7700..6746b50 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -204,7 +204,7 @@ public class NestedLoopJoin {
         outBuffers.clear();
         currentMemSize = 0;
 
-        appender.flush(writer, true);
+        appender.write(writer, true);
     }
 
     private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index c0c467a..a692674 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -138,8 +138,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
             boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
 
         super(spec, 2, 1);
-        this.memsize = memsize;
         this.inputsize0 = inputsize0;
+        this.memsize = memsize;
         this.fudgeFactor = factor;
         this.probeKeys = keys0;
         this.buildKeys = keys1;
@@ -450,7 +450,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             }
                             //Build Side is smaller
                             applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, probeHpc, buildHpc,
-                                    buildSideReader, probeSideReader, false, pid); //checked-confirmed
+                                    buildSideReader, probeSideReader, false, pid); // checked-confirmed
                         } else { //Case 1.2 - InMemHJ with Role Reversal
                             LOGGER.fine(
                                     "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
@@ -463,7 +463,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             }
                             //Probe Side is smaller
                             applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, buildHpc, probeHpc,
-                                    probeSideReader, buildSideReader, true, pid); //checked-confirmed
+                                    probeSideReader, buildSideReader, true, pid); // checked-confirmed
                         }
                     }
                     //Apply (Recursive) HHJ
@@ -515,7 +515,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); //checked-confirmed
+                                    joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); // checked-confirmed
                                 }
 
                             } else { //Case 2.1.2 - Switch to NLJ
@@ -636,7 +636,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     }
                     bReader.close();
                     rPartbuff.reset();
-                    // probe
+                    //probe
                     pReader.open();
                     while (pReader.nextFrame(rPartbuff)) {
                         joiner.join(rPartbuff.getBuffer(), writer);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
index 048cea2..e4b5bb3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
@@ -62,6 +62,10 @@ public class DeserializedMapperOperatorDescriptor extends AbstractSingleActivity
         public void writeData(Object[] data) throws HyracksDataException {
             mapper.map(data, writer);
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+        }
     }
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 0c647d7..af42168 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -48,7 +48,7 @@ public class ConstantTupleSourceOperatorNodePushable extends AbstractUnaryOutput
             appender.append(fieldSlots, tupleData, 0, tupleSize);
         writer.open();
         try {
-            appender.flush(writer, false);
+            appender.write(writer, false);
         } catch (Throwable th) {
             writer.fail();
             throw new HyracksDataException(th);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
index 971f61d..817c6d2 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
@@ -40,7 +40,7 @@ public class IdentityOperatorDescriptor extends AbstractSingleActivityOperatorDe
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             @Override
             public void open() throws HyracksDataException {
@@ -61,6 +61,11 @@ public class IdentityOperatorDescriptor extends AbstractSingleActivityOperatorDe
             public void close() throws HyracksDataException {
                 writer.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                writer.flush();
+            }
         };
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
index 5c09f42..084c9ab 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
@@ -46,7 +46,7 @@ public class LimitOperatorDescriptor extends AbstractSingleActivityOperatorDescr
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
 
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             private FrameTupleAccessor fta;
@@ -73,7 +73,7 @@ public class LimitOperatorDescriptor extends AbstractSingleActivityOperatorDescr
                             FrameUtils.appendToWriter(writer, partialAppender, fta, i);
                             currentSize++;
                         }
-                        partialAppender.flush(writer,false);
+                        partialAppender.write(writer, false);
                         finished = true;
                     } else {
                         FrameUtils.flushFrame(buffer, writer);
@@ -92,6 +92,11 @@ public class LimitOperatorDescriptor extends AbstractSingleActivityOperatorDescr
             public void close() throws HyracksDataException {
                 writer.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                writer.flush();
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 36bf919..db86980 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -30,8 +30,6 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
@@ -61,8 +59,8 @@ public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
         if (isSingleActivity) {
-            MaterializerReaderActivityNode mra = new MaterializerReaderActivityNode(new ActivityId(odId,
-                    MATERIALIZER_READER_ACTIVITY_ID));
+            MaterializerReaderActivityNode mra = new MaterializerReaderActivityNode(
+                    new ActivityId(odId, MATERIALIZER_READER_ACTIVITY_ID));
 
             builder.addActivity(this, mra);
             builder.addSourceEdge(0, mra, 0);
@@ -97,8 +95,8 @@ public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
+                    state = new MaterializerTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
                     state.open(ctx);
                 }
 
@@ -116,7 +114,6 @@ public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor
                     state.close();
                     state.writeOut(writer, new VSizeFrame(ctx));
                 }
-
             };
         }
     }
@@ -136,8 +133,8 @@ public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
+                    state = new MaterializerTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
                     state.open(ctx);
                 }
 
@@ -172,8 +169,8 @@ public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
-                            getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
+                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
                     state.writeOut(writer, new VSizeFrame(ctx));
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 792a041..9ff5d9b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -62,6 +62,11 @@ public class PrinterOperatorDescriptor extends AbstractSingleActivityOperatorDes
         public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
             throw new IllegalArgumentException();
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+            System.err.flush();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 82c62a5..dc368eb 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -138,6 +138,13 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
                 }
 
                 @Override
+                public void flush() throws HyracksDataException {
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        writers[i].flush();
+                    }
+                }
+
+                @Override
                 public void close() throws HyracksDataException {
                     HyracksDataException hde = null;
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index 4f12db1..41c9569 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -88,8 +88,8 @@ public class SplitVectorOperatorDescriptor extends AbstractOperatorDescriptor {
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new CollectTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
+                    state = new CollectTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));
                     state.buffer = new ArrayList<Object[]>();
                 }
 
@@ -107,9 +107,14 @@ public class SplitVectorOperatorDescriptor extends AbstractOperatorDescriptor {
                 public void fail() throws HyracksDataException {
 
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    // flush() is a no op since the frame writer's whole job is to write state data to a buffer
+                }
             };
-            return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
-                    getActivityId(), 0));
+            return new DeserializedOperatorNodePushable(ctx, op,
+                    recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
         }
     }
 
@@ -138,8 +143,8 @@ public class SplitVectorOperatorDescriptor extends AbstractOperatorDescriptor {
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (CollectTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
-                            COLLECT_ACTIVITY_ID), partition));
+                    state = (CollectTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), COLLECT_ACTIVITY_ID), partition));
                 }
 
                 @Override
@@ -161,9 +166,14 @@ public class SplitVectorOperatorDescriptor extends AbstractOperatorDescriptor {
                 public void fail() throws HyracksDataException {
                     writer.fail();
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    writer.flush();
+                }
             };
-            return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getOutputRecordDescriptor(
-                    getActivityId(), 0));
+            return new DeserializedOperatorNodePushable(ctx, op,
+                    recordDescProvider.getOutputRecordDescriptor(getActivityId(), 0));
         }
     }
 
@@ -171,7 +181,8 @@ public class SplitVectorOperatorDescriptor extends AbstractOperatorDescriptor {
 
     private final int splits;
 
-    public SplitVectorOperatorDescriptor(IOperatorDescriptorRegistry spec, int splits, RecordDescriptor recordDescriptor) {
+    public SplitVectorOperatorDescriptor(IOperatorDescriptorRegistry spec, int splits,
+            RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         this.splits = splits;
         recordDescriptors[0] = recordDescriptor;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index f50d14c..e56d51d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -173,7 +173,7 @@ public abstract class AbstractFrameSorter implements IFrameSorter {
             }
         }
         maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
-        outputAppender.flush(writer, true);
+        outputAppender.write(writer, true);
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine(
                     "Flushed records:" + limit + " out of " + tupleCount + "; Flushed through " + (io + 1) + " frames");

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index ea46682..c1e81ff 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -29,9 +29,9 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotSmallestFit;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotBiggestFirst;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
+import org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotSmallestFit;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.IFrameFreeSlotPolicy;
 import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
@@ -53,7 +53,7 @@ public class ExternalSortRunGenerator extends AbstractSortRunGenerator {
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         this(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc, alg, policy, framesLimit,
                 Integer.MAX_VALUE);
     }
@@ -61,7 +61,7 @@ public class ExternalSortRunGenerator extends AbstractSortRunGenerator {
     public ExternalSortRunGenerator(IHyracksTaskContext ctx, int[] sortFields,
             INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDesc, Algorithm alg, EnumFreeSlotPolicy policy, int framesLimit, int outputLimit)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         this.ctx = ctx;
         maxSortFrames = framesLimit - 1;
 
@@ -98,12 +98,14 @@ public class ExternalSortRunGenerator extends AbstractSortRunGenerator {
         }
     }
 
+    @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                ExternalSortRunGenerator.class.getSimpleName());
+        FileReference file = ctx.getJobletContext()
+                .createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIOManager());
     }
 
+    @Override
     protected IFrameWriter getFlushableFrameWriter(RunFileWriter writer) throws HyracksDataException {
         return writer;
     }
@@ -112,5 +114,4 @@ public class ExternalSortRunGenerator extends AbstractSortRunGenerator {
     public ISorter getSorter() {
         return frameSorter;
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
index bf4f621..7f2847c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/HeapSortRunGenerator.java
@@ -65,8 +65,7 @@ public class HeapSortRunGenerator extends AbstractSortRunGenerator {
     public void open() throws HyracksDataException {
         IFramePool framePool = new VariableFramePool(ctx, (frameLimit - 1) * ctx.getInitialFrameSize());
         ITupleBufferManager bufferManager = new VariableTupleMemoryManager(framePool, recordDescriptor);
-        tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, sortFields, nmkFactory,
-                comparatorFactories);
+        tupleSorter = new TupleSorterHeapSort(ctx, bufferManager, topK, sortFields, nmkFactory, comparatorFactories);
         super.open();
     }
 
@@ -77,8 +76,8 @@ public class HeapSortRunGenerator extends AbstractSortRunGenerator {
 
     @Override
     protected RunFileWriter getRunFileWriter() throws HyracksDataException {
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                HeapSortRunGenerator.class.getSimpleName());
+        FileReference file = ctx.getJobletContext()
+                .createManagedWorkspaceFile(HeapSortRunGenerator.class.getSimpleName());
         return new RunFileWriter(file, ctx.getIOManager());
     }
 
@@ -99,5 +98,4 @@ public class HeapSortRunGenerator extends AbstractSortRunGenerator {
             }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
index 5c31f89..fc15bbc 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -262,7 +262,7 @@ public class TupleSorterHeapSort implements ITupleSorter {
             }
         }
         maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
-        outputAppender.flush(writer, true);
+        outputAppender.write(writer, true);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(
                     "Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
index 289b879..e76d2d3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -121,6 +121,13 @@ public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
                         }
                     }
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        writer.flush();
+                    }
+                }
             };
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
index b656b46..92d18f0 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
@@ -76,4 +76,9 @@ public final class DeserializedOperatorNodePushable extends AbstractUnaryInputOp
     public String getDisplayName() {
         return "Deserialized(" + delegate + ")";
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        delegate.flush();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
index 431a4f4..5ef6417 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
@@ -88,14 +88,14 @@ public class DataGenOperatorDescriptor extends AbstractSingleActivityOperatorDes
                         }
 
                         if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                            appender.flush(writer, true);
+                            appender.write(writer, true);
                             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                                 throw new HyracksDataException("Record size (" + tb.getSize()
                                         + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
                             }
                         }
                     }
-                    appender.flush(writer, true);
+                    appender.write(writer, true);
                 } catch (Throwable th) {
                     writer.fail();
                     throw new HyracksDataException(th);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/comm/SerializationDeserializationTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/comm/SerializationDeserializationTest.java
index 41e63a0..995ae30 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -26,8 +26,6 @@ import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.junit.Test;
-
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameReader;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -45,6 +43,7 @@ import org.apache.hyracks.dataflow.common.comm.io.SerializingDataWriter;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Test;
 
 public class SerializationDeserializationTest {
     private static final Logger LOGGER = Logger.getLogger(SerializationDeserializationTest.class.getName());
@@ -82,6 +81,10 @@ public class SerializationDeserializationTest {
                 @Override
                 public void fail() throws HyracksDataException {
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             });
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
index fb816da..6b08c3e 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/ErrorReportingTest.java
@@ -187,6 +187,10 @@ class ExceptionRaisingOperatorDescriptor extends AbstractOperatorDescriptor {
                         public void close() throws HyracksDataException {
 
                         }
+
+                        @Override
+                        public void flush() throws HyracksDataException {
+                        }
                     };
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
index 5d300f4..bba18b3 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
@@ -173,6 +173,10 @@ class ThreadCountingOperatorDescriptor extends AbstractOperatorDescriptor {
                         public void close() throws HyracksDataException {
 
                         }
+
+                        @Override
+                        public void flush() throws HyracksDataException {
+                        }
                     };
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
index 43e7649..f621bf9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
@@ -34,8 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import org.junit.Test;
-
 import org.apache.hyracks.api.comm.FixedSizeFrame;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -45,8 +43,9 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator;
+import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
+import org.junit.Test;
 
 public class TopKRunGeneratorTest {
 
@@ -90,14 +89,18 @@ public class TopKRunGeneratorTest {
         public void close() throws HyracksDataException {
             assertTrue(answer.isEmpty());
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+        }
     }
 
     @Test
     public void testReverseOrderedDataShouldNotGenerateAnyRuns() throws HyracksDataException {
         int topK = 1;
         IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
-        HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
-                SortFields, null, ComparatorFactories, RecordDesc);
+        HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields, null,
+                ComparatorFactories, RecordDesc);
 
         testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
     }
@@ -106,8 +109,8 @@ public class TopKRunGeneratorTest {
     public void testAlreadySortedDataShouldNotGenerateAnyRuns() throws HyracksDataException {
         int topK = SORT_FRAME_LIMIT;
         IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
-        HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
-                SortFields, null, ComparatorFactories, RecordDesc);
+        HeapSortRunGenerator sorter = new HeapSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields, null,
+                ComparatorFactories, RecordDesc);
 
         testInMemoryOnly(ctx, topK, ORDER.INORDER, sorter);
     }
@@ -116,8 +119,8 @@ public class TopKRunGeneratorTest {
     public void testHybridTopKShouldNotGenerateAnyRuns() throws HyracksDataException {
         int topK = 1;
         IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
-        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
-                SortFields, null, ComparatorFactories, RecordDesc);
+        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields, null,
+                ComparatorFactories, RecordDesc);
 
         testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
     }
@@ -126,8 +129,8 @@ public class TopKRunGeneratorTest {
     public void testHybridTopKShouldSwitchToFrameSorterWhenFlushed() {
         int topK = 1;
         IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
-        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK,
-                SortFields, null, ComparatorFactories, RecordDesc);
+        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields, null,
+                ComparatorFactories, RecordDesc);
 
     }
 
@@ -148,8 +151,8 @@ public class TopKRunGeneratorTest {
         int minRecordSize = 16;
         int maxRecordSize = 64;
 
-        AbstractRunGeneratorTest
-                .prepareData(ctx, frameList, minDataSize, minRecordSize, maxRecordSize, null, keyValuePair);
+        AbstractRunGeneratorTest.prepareData(ctx, frameList, minDataSize, minRecordSize, maxRecordSize, null,
+                keyValuePair);
 
         assert topK > 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
index 964280b..eb2714f 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
+++ b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
@@ -59,7 +59,7 @@ public class WordTupleParserFactory implements ITupleParserFactory {
                         FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
                                 tb.getSize());
                     }
-                    appender.flush(writer, true);
+                    appender.write(writer, true);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java
index 3e0bb14..f6bf66c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -60,7 +60,7 @@ public class TextKeyValueParserFactory implements IKeyValueParserFactory<LongWri
 
             @Override
             public void close(IFrameWriter writer) throws HyracksDataException {
-                appender.flush(writer, false);
+                appender.write(writer, false);
             }
 
         };

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index a97441c..2226b03 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -40,7 +40,8 @@ public class BTreeSearchOperatorNodePushable extends IndexSearchOperatorNodePush
 
     public BTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, IRecordDescriptorProvider recordDescProvider, int[] lowKeyFields, int[] highKeyFields,
-            boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) {
+            boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes,
+            int[] maxFilterFieldIndexes) {
         super(opDesc, ctx, partition, recordDescProvider, minFilterFieldIndexes, maxFilterFieldIndexes);
         this.lowKeyInclusive = lowKeyInclusive;
         this.highKeyInclusive = highKeyInclusive;
@@ -75,8 +76,8 @@ public class BTreeSearchOperatorNodePushable extends IndexSearchOperatorNodePush
         ITreeIndex treeIndex = (ITreeIndex) index;
         lowKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), lowKey);
         highKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), highKey);
-        return new RangePredicate(lowKey, highKey, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
-                highKeySearchCmp, minFilterKey, maxFilterKey);
+        return new RangePredicate(lowKey, highKey, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp, highKeySearchCmp,
+                minFilterKey, maxFilterKey);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index b5c14c1..6fc26a7 100644
--- a/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++ b/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -178,7 +178,7 @@ public class FramewriterTest {
                 writer.nextFrame(EMPTY_BUFFER);
                 return null;
             }
-        }).when(appenders[0]).flush(Matchers.any(IFrameWriter.class), Matchers.anyBoolean());
+        }).when(appenders[0]).write(Matchers.any(IFrameWriter.class), Matchers.anyBoolean());
 
         appenders[1] = Mockito.mock(FrameTupleAppender.class);
         Mockito.doAnswer(new Answer<Object>() {
@@ -186,7 +186,7 @@ public class FramewriterTest {
             public Object answer(InvocationOnMock invocation) throws Throwable {
                 throw new HyracksDataException("couldn't flush frame");
             }
-        }).when(appenders[1]).flush(Matchers.any(IFrameWriter.class), Matchers.anyBoolean());
+        }).when(appenders[1]).write(Matchers.any(IFrameWriter.class), Matchers.anyBoolean());
 
         return appenders;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 631b2f1..59d852e 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -91,7 +91,6 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu
             }
         }
         FrameUtils.flushFrame(buffer, writer);
-
     }
 
     @Override
@@ -113,6 +112,11 @@ public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOu
     }
 
     @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
+
+    @Override
     public void fail() throws HyracksDataException {
         if (index != null) {
             writer.fail();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 6c7e8d0..6736420 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -162,4 +162,9 @@ public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryIn
             writer.fail();
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 568bde8..c4f3fca 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -188,12 +188,17 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
     }
 
     @Override
+    public void flush() throws HyracksDataException {
+        appender.flush(writer);
+    }
+
+    @Override
     public void close() throws HyracksDataException {
         HyracksDataException closeException = null;
         if (index != null) {
             // if index == null, then the index open was not successful
             try {
-                appender.flush(writer, true);
+                appender.write(writer, true);
             } catch (Throwable th) {
                 closeException = new HyracksDataException(th);
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 08775bb..1b1bee0 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -82,7 +82,7 @@ public class TreeIndexDiskOrderScanOperatorNodePushable extends AbstractUnaryOut
                     FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
                             tb.getSize());
                 }
-                appender.flush(writer, true);
+                appender.write(writer, true);
             } catch (Throwable th) {
                 writer.fail();
                 throw new HyracksDataException(th);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index 48e65bb..f70df29 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -84,7 +84,7 @@ public class TreeIndexStatsOperatorNodePushable extends AbstractUnaryOutputSourc
                 throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
                         + appender.getBuffer().capacity() + ")");
             }
-            appender.flush(writer, false);
+            appender.write(writer, false);
         } catch (Exception e) {
             writer.fail();
             throw new HyracksDataException(e);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index 50f7be0..25f2382 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -120,6 +120,6 @@ public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertU
         for (int i = startTupleIndex; i < endTupleIndex; i++) {
             FrameUtils.appendToWriter(writer, appender, accessor, i);
         }
-        appender.flush(writer, true);
+        appender.write(writer, true);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index c91aff7..002457b 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -154,7 +154,7 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary
     @Override
     public void close() throws HyracksDataException {
         try {
-            appender.flush(writer, true);
+            appender.write(writer, true);
         } finally {
             writer.close();
         }
@@ -164,4 +164,9 @@ public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnary
     public void fail() throws HyracksDataException {
         writer.fail();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        appender.flush(writer);
+    }
 }


[2/2] incubator-asterixdb-hyracks git commit: Add flush() to IFrameWriter

Posted by am...@apache.org.
Add flush() to IFrameWriter

This change introduces flush() method in frame writers. It is
intended to be used for long running jobs (Feeds) to flush contents
of frames all the way to storage.

Change-Id: I85424bab7965b71aac709280af066e1655457aa3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/584
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/637e9552
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/637e9552
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/637e9552

Branch: refs/heads/master
Commit: 637e95524fdf5268f9d1c5b8d488a64eec57138e
Parents: 0b64267
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Tue Jan 26 21:57:52 2016 +0400
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Fri Jan 29 15:45:47 2016 -0800

----------------------------------------------------------------------
 ...estedPlansAccumulatingAggregatorFactory.java |  7 ++--
 .../NestedPlansRunningAggregatorFactory.java    | 15 +++++---
 ...actOneInputOneOutputOneFramePushRuntime.java |  2 +-
 .../operators/base/SinkRuntimeFactory.java      |  5 +++
 .../MicroPreClusteredGroupRuntimeFactory.java   |  6 ++-
 .../meta/AlgebricksMetaOperatorDescriptor.java  |  5 +++
 .../operators/meta/SubplanRuntimeFactory.java   |  5 ++-
 .../operators/std/AssignRuntimeFactory.java     |  5 +++
 .../std/EmptyTupleSourceRuntimeFactory.java     |  7 +++-
 .../std/NestedTupleSourceRuntimeFactory.java    |  5 ++-
 .../PartitioningSplitOperatorDescriptor.java    | 12 +++++-
 .../operators/std/PrinterRuntimeFactory.java    |  2 +-
 .../std/RunningAggregateRuntimeFactory.java     |  4 ++
 .../operators/std/SinkWriterRuntime.java        | 17 +++++----
 .../operators/std/SinkWriterRuntimeFactory.java |  2 +-
 .../std/StreamLimitRuntimeFactory.java          |  5 +++
 .../std/StreamProjectRuntimeFactory.java        |  5 +++
 .../std/StreamSelectRuntimeFactory.java         |  5 +++
 .../std/StringStreamingRuntimeFactory.java      |  5 +++
 .../operators/std/UnnestRuntimeFactory.java     |  5 +++
 .../apache/hyracks/api/comm/IFrameAppender.java | 18 +++++++--
 .../apache/hyracks/api/comm/IFrameWriter.java   | 14 ++++---
 .../hyracks/api/dataflow/IDataWriter.java       | 11 +++---
 .../comm/channels/NetworkOutputChannel.java     |  6 +++
 .../nc/dataset/DatasetPartitionWriter.java      |  4 --
 .../MaterializedPartitionInputChannel.java      |  7 +++-
 .../partitions/MaterializedPartitionWriter.java |  5 +++
 .../MaterializingPipelinedPartition.java        | 24 ++++++++++--
 .../nc/partitions/PipelinedPartition.java       | 10 ++++-
 .../ConnectorSenderProfilingFrameWriter.java    |  5 +++
 .../ProfilingPartitionWriterFactory.java        | 10 ++++-
 .../common/comm/io/AbstractFrameAppender.java   | 12 +++---
 .../comm/io/FrameDeserializingDataWriter.java   |  5 +++
 .../common/comm/io/FrameFixedFieldAppender.java |  4 +-
 .../comm/io/FrameFixedFieldTupleAppender.java   |  4 +-
 .../common/comm/io/FrameOutputStream.java       |  2 +-
 .../comm/io/FrameTupleAppenderWrapper.java      | 13 ++++---
 .../common/comm/io/SerializingDataWriter.java   |  8 +++-
 .../dataflow/common/comm/util/FrameUtils.java   | 20 +++++-----
 .../dataflow/common/io/RunFileWriter.java       |  5 +++
 .../FrameFixedFieldTupleAppenderTest.java       | 39 ++++++++++----------
 ...tractUnaryInputSinkOperatorNodePushable.java |  6 +++
 .../LocalityAwarePartitionDataWriter.java       |  9 ++++-
 .../MToNReplicatingConnectorDescriptor.java     |  9 ++++-
 .../std/connectors/PartitionDataWriter.java     | 10 ++++-
 ...tDeserializedFileScanOperatorDescriptor.java |  5 +++
 .../AbstractFileWriteOperatorDescriptor.java    |  5 +++
 .../file/DelimitedDataTupleParserFactory.java   |  2 +-
 .../DeserializedPreclusteredGroupOperator.java  |  8 ++++
 .../std/group/HashSpillableTableFactory.java    |  8 ++--
 .../ExternalGroupMergeOperatorNodePushable.java |  4 +-
 .../std/group/hash/GroupingHashTable.java       |  2 +-
 .../PreclusteredGroupOperatorNodePushable.java  |  5 +++
 .../preclustered/PreclusteredGroupWriter.java   |  2 +-
 .../dataflow/std/join/InMemoryHashJoin.java     |  2 +-
 .../dataflow/std/join/NestedLoopJoin.java       |  2 +-
 ...timizedHybridHashJoinOperatorDescriptor.java | 10 ++---
 .../DeserializedMapperOperatorDescriptor.java   |  4 ++
 ...ConstantTupleSourceOperatorNodePushable.java |  2 +-
 .../std/misc/IdentityOperatorDescriptor.java    |  7 +++-
 .../std/misc/LimitOperatorDescriptor.java       |  9 ++++-
 .../misc/MaterializingOperatorDescriptor.java   | 19 ++++------
 .../std/misc/PrinterOperatorDescriptor.java     |  5 +++
 .../std/misc/SplitOperatorDescriptor.java       |  7 ++++
 .../std/misc/SplitVectorOperatorDescriptor.java | 29 ++++++++++-----
 .../dataflow/std/sort/AbstractFrameSorter.java  |  2 +-
 .../std/sort/ExternalSortRunGenerator.java      | 13 ++++---
 .../dataflow/std/sort/HeapSortRunGenerator.java |  8 ++--
 .../dataflow/std/sort/TupleSorterHeapSort.java  |  2 +-
 .../std/union/UnionAllOperatorDescriptor.java   |  7 ++++
 .../util/DeserializedOperatorNodePushable.java  |  5 +++
 .../btree/helper/DataGenOperatorDescriptor.java |  4 +-
 .../comm/SerializationDeserializationTest.java  |  7 +++-
 .../tests/rewriting/ErrorReportingTest.java     |  4 ++
 .../rewriting/SuperActivityRewritingTest.java   |  4 ++
 .../tests/unit/TopKRunGeneratorTest.java        | 29 ++++++++-------
 .../examples/text/WordTupleParserFactory.java   |  2 +-
 .../hdfs/lib/TextKeyValueParserFactory.java     |  2 +-
 .../BTreeSearchOperatorNodePushable.java        |  7 ++--
 .../storage/am/btree/test/FramewriterTest.java  |  4 +-
 .../IndexBulkLoadOperatorNodePushable.java      |  6 ++-
 ...xInsertUpdateDeleteOperatorNodePushable.java |  5 +++
 .../IndexSearchOperatorNodePushable.java        |  7 +++-
 ...eIndexDiskOrderScanOperatorNodePushable.java |  2 +-
 .../TreeIndexStatsOperatorNodePushable.java     |  2 +-
 ...xInsertUpdateDeleteOperatorNodePushable.java |  2 +-
 .../BinaryTokenizerOperatorNodePushable.java    |  7 +++-
 87 files changed, 466 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index fefc72e..9f0960d 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -54,8 +54,7 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
 
-        final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length,
-                decorFieldIdx.length);
+        final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length);
         final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
             try {
@@ -132,8 +131,8 @@ public class NestedPlansAccumulatingAggregatorFactory extends AbstractAccumulati
             }
 
             @Override
-            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state) throws HyracksDataException {
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 2ca933a..4b63155 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -127,8 +127,8 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
             }
 
             @Override
-            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
-                    int tIndex, AggregateState state) throws HyracksDataException {
+            public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+                    AggregateState state) throws HyracksDataException {
                 throw new IllegalStateException("this method should not be called");
             }
 
@@ -219,14 +219,14 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
                 for (int f = 0; f < w; f++) {
                     tb.addField(accessor, tIndex, f);
                 }
-                FrameUtils.appendToWriter(outputWriter, outputAppender, tb.getFieldEndOffsets(),
-                        tb.getByteArray(), 0, tb.getSize());
+                FrameUtils.appendToWriter(outputWriter, outputAppender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+                        tb.getSize());
             }
         }
 
         @Override
         public void close() throws HyracksDataException {
-            outputAppender.flush(outputWriter, true);
+            outputAppender.write(outputWriter, true);
         }
 
         public void setInputIdx(int inputIdx) {
@@ -241,6 +241,11 @@ public class NestedPlansRunningAggregatorFactory implements IAggregatorDescripto
         public void fail() throws HyracksDataException {
         }
 
+        @Override
+        public void flush() throws HyracksDataException {
+            outputAppender.flush(outputWriter);
+        }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index e57a4ba..e94f4b7 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -60,7 +60,7 @@ public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends Abstr
 
     protected void flushAndReset() throws HyracksDataException {
         if (appender.getTupleCount() > 0) {
-            appender.flush(writer, true);
+            appender.write(writer, true);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
index 69876b0..d1fb703 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -57,6 +57,11 @@ public class SinkRuntimeFactory implements IPushRuntimeFactory {
             @Override
             public void close() throws HyracksDataException {
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                // flush() is meaningless for sink operators
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index b7a73d4..2368a9c 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -29,7 +29,6 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 
@@ -90,6 +89,11 @@ public class MicroPreClusteredGroupRuntimeFactory extends AbstractOneInputOneOut
             public void close() throws HyracksDataException {
                 pgw.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                pgw.flush();
+            }
         };
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 8322cdc..f2afe98 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -157,6 +157,11 @@ public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOper
             public void fail() throws HyracksDataException {
                 startOfPipeline.fail();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                startOfPipeline.flush();
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 25eb229..ac9bae2 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -136,7 +136,6 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto
                         tb.addFieldEndOffset();
                     }
                 }
-
             }
 
             IFrameWriter endPipe = new TupleOuterProduct();
@@ -166,6 +165,10 @@ public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFacto
                 }
             }
 
+            @Override
+            public void flush() throws HyracksDataException {
+                writer.flush();
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 79d9b7d..1877d64 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -171,6 +171,11 @@ public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
                     writer.fail();
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                appender.flush(writer);
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index a2b9652..2b7c2da 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -52,13 +52,18 @@ public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
                 if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                     throw new IllegalStateException();
                 }
-                appender.flush(writer, true);
+                appender.write(writer, true);
             }
 
             @Override
             public void close() throws HyracksDataException {
                 writer.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                appender.flush(writer);
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 5fcb9ef..0f1e50d 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -69,8 +69,9 @@ public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
             writer.fail();
         }
 
-        public void forceFlush() throws HyracksDataException {
-            appender.flush(writer, true);
+        @Override
+        public void flush() throws HyracksDataException {
+            writer.flush();
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index 3d1eb06..b7f11d8 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -89,8 +89,7 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
                     if (isOpen[i]) {
                         try {
                             tupleAppender.reset(writeBuffers[i], false);
-                            // ? by JF why didn't clear the buffer ?
-                            tupleAppender.flush(writers[i], false);
+                            tupleAppender.write(writers[i], false);
                         } catch (Throwable th) {
                             if (hde == null) {
                                 hde = new HyracksDataException();
@@ -114,6 +113,14 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
             }
 
             @Override
+            public void flush() throws HyracksDataException {
+                for (int i = 0; i < outputArity; i++) {
+                    tupleAppender.reset(writeBuffers[i], false);
+                    tupleAppender.flush(writers[i]);
+                }
+            }
+
+            @Override
             public void fail() throws HyracksDataException {
                 HyracksDataException hde = null;
                 for (int i = 0; i < outputArity; i++) {
@@ -172,6 +179,7 @@ public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityO
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 }
+                tupleAppender.reset(writeBuffers[outputIndex], false);
                 FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, tupleBuilder.getFieldEndOffsets(),
                         tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
index b77f7b8..021784a 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -59,6 +59,6 @@ public class PrinterRuntimeFactory implements IPushRuntimeFactory {
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
         IAWriter w = PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, printerFactories,
                 inputRecordDesc);
-        return new SinkWriterRuntime(w, ctx, System.out, inputRecordDesc);
+        return new SinkWriterRuntime(w, System.out, inputRecordDesc);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 8a5f38c..5a26f36 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -160,6 +160,10 @@ public class RunningAggregateRuntimeFactory extends AbstractOneInputOneOutputRun
                 }
             }
 
+            @Override
+            public void flush() throws HyracksDataException {
+                appender.flush(writer);
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index c069e4c..2512c5e 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -24,14 +24,12 @@ import java.nio.ByteBuffer;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.data.IAWriter;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
 
-    private final IHyracksTaskContext ctx;
     private final PrintStream printStream;
     private final IAWriter writer;
     private RecordDescriptor inputRecordDesc;
@@ -39,18 +37,16 @@ public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
     private boolean autoClose = false;
     private boolean first = true;
 
-    public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
-            RecordDescriptor inputRecordDesc) {
+    public SinkWriterRuntime(IAWriter writer, PrintStream printStream, RecordDescriptor inputRecordDesc) {
         this.writer = writer;
-        this.ctx = ctx;
         this.printStream = printStream;
         this.inputRecordDesc = inputRecordDesc;
         this.tAccess = new FrameTupleAccessor(inputRecordDesc);
     }
 
-    public SinkWriterRuntime(IAWriter writer, IHyracksTaskContext ctx, PrintStream printStream,
-            RecordDescriptor inputRecordDesc, boolean autoClose) {
-        this(writer, ctx, printStream, inputRecordDesc);
+    public SinkWriterRuntime(IAWriter writer, PrintStream printStream, RecordDescriptor inputRecordDesc,
+            boolean autoClose) {
+        this(writer, printStream, inputRecordDesc);
         this.autoClose = autoClose;
     }
 
@@ -95,4 +91,9 @@ public class SinkWriterRuntime extends AbstractOneInputSinkPushRuntime {
     @Override
     public void fail() throws HyracksDataException {
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // flush() makes no sense to sink operators
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
index 3f66e23..62e3542 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -75,6 +75,6 @@ public class SinkWriterRuntimeFactory implements IPushRuntimeFactory {
             throw new AlgebricksException(e);
         }
         IAWriter w = writerFactory.createWriter(fields, filePrintStream, printerFactories, inputRecordDesc);
-        return new SinkWriterRuntime(w, ctx, filePrintStream, inputRecordDesc, true);
+        return new SinkWriterRuntime(w, filePrintStream, inputRecordDesc, true);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index ef172c7..11f47ac 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -147,6 +147,11 @@ public class StreamLimitRuntimeFactory extends AbstractOneInputOneOutputRuntimeF
                 return lim;
             }
 
+            @Override
+            public void flush() throws HyracksDataException {
+                appender.flush(writer);
+            }
+
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 2cea90d..001a598 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -84,6 +84,11 @@ public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntim
 
             }
 
+            @Override
+            public void flush() throws HyracksDataException {
+                appender.flush(writer);
+            }
+
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 75c3d08..5eb4604 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -157,6 +157,11 @@ public class StreamSelectRuntimeFactory extends AbstractOneInputOneOutputRuntime
                     }
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                appender.flush(writer);
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index 395d321..a9f5a21 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -188,6 +188,11 @@ public class StringStreamingRuntimeFactory extends AbstractOneInputOneOutputRunt
                 // close the following operator in the chain
                 super.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                ps.flush();
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 6b21cda..2c04003 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -159,6 +159,11 @@ public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactor
                     }
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                appender.flush(writer);
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
index 18e44cc..e48e4d6 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
@@ -48,12 +48,22 @@ public interface IFrameAppender {
     ByteBuffer getBuffer();
 
     /**
-     * Flush the frame content to the given writer.
-     * Clear the inner buffer after flush if {@code clear} is <code>true</code>.
+     * Write the frame content to the given writer.
+     * Clear the inner buffer after write if {@code clear} is <code>true</code>.
      *
      * @param outWriter the output writer
-     * @param clear     indicate whether to clear the inside frame after flushed or not.
+     * @param clear     indicate whether to clear the inside frame after writing or not.
      * @throws HyracksDataException
      */
-    void flush(IFrameWriter outWriter, boolean clear) throws HyracksDataException;
+    void write(IFrameWriter outWriter, boolean clear) throws HyracksDataException;
+
+    /**
+     * Write currently buffered records to {@code writer} then flushes {@code writer}. The inside frame is always cleared
+     * @param writer the FrameWriter to write to and flush
+     * @throws HyracksDataException
+     */
+    public default void flush(IFrameWriter writer) throws HyracksDataException {
+        write(writer, true);
+        writer.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
index f6c3ad0..87c5223 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
@@ -39,14 +39,13 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
  * <ul>
  * <li>{@link IFrameWriter#close()} to give up any resources owned by the {@link IFrameWriter} and enter the CLOSED state.</li>
  * <li>{@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the {@link IFrameWriter}. The call returns normally on success and the {@link IFrameWriter} remains in the OPENED state. On failure, the call throws a {@link HyracksDataException}, the {@link IFrameWriter} remains in the OPENED state.</li>
+ * <li>{@link IFrameWriter#flush()} to push tuples that are ready in the output frame. The {@link IFrameWriter} remains in the OPENED state.</li>
  * <li>{@link IFrameWriter#fail()} to indicate that stream is to be aborted. The {@link IFrameWriter} enters the FAILED state.</li>
  * </ul>
  * In the FAILED state, the only call allowed is the {@link IFrameWriter#close()} to move the {@link IFrameWriter} into the CLOSED
  * state and give up all resources.
  * No calls are allowed when the {@link IFrameWriter} is in the CLOSED state.
  * Note: If the call to {@link IFrameWriter#open()} failed, the {@link IFrameWriter#close()} must still be called by the producer.
- *
- * @author vinayakb
  */
 public interface IFrameWriter {
     /**
@@ -56,7 +55,6 @@ public interface IFrameWriter {
 
     /**
      * Provide data to the stream of this {@link IFrameWriter}.
-     *
      * @param buffer
      *            - Buffer containing data.
      * @throws HyracksDataException
@@ -64,16 +62,22 @@ public interface IFrameWriter {
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
 
     /**
+     * request the frame to push its content forward and flush its consumers
+     * @throws HyracksDataException
+     */
+    public default void flush() throws HyracksDataException {
+        throw new HyracksDataException("flush() is not supported in this IFrameWriter");
+    }
+
+    /**
      * Indicate that a failure was encountered and the current stream is to be
      * aborted.
-     *
      * @throws HyracksDataException
      */
     public void fail() throws HyracksDataException;
 
     /**
      * Close this {@link IFrameWriter} and give up all resources.
-     *
      * @throws HyracksDataException
      */
     public void close() throws HyracksDataException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDataWriter.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDataWriter.java
index a5d3d4e..0b8ffc0 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDataWriter.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IDataWriter.java
@@ -22,13 +22,10 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Accepts data from data producers.
- * 
- * @author vinayakb
  */
 public interface IDataWriter<T> {
     /**
      * Pushes data to the acceptor.
-     * 
      * @param data
      *            - Data pushed to the acceptor. <code>null</code> indicates the
      *            end of stream.
@@ -38,14 +35,18 @@ public interface IDataWriter<T> {
 
     /**
      * Indicates that the stream has failed.
-     * 
      * @throws HyracksDataException
      */
     public void fail() throws HyracksDataException;
 
     /**
+     * Request the writer to flush its content
+     * @throws HyracksDataException
+     */
+    public void flush() throws HyracksDataException;
+
+    /**
      * Closes this writer.
-     * 
      * @throws Exception
      */
     public void close() throws HyracksDataException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index ed4279d..b5f359c 100644
--- a/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -118,4 +118,10 @@ public class NetworkOutputChannel implements IFrameWriter {
             }
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // At the network boundary.
+        // This frame writer always pushes its content
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 9fa511c..e007050 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -41,8 +41,6 @@ public class DatasetPartitionWriter implements IFrameWriter {
 
     private final ResultSetId resultSetId;
 
-    private final boolean asyncMode;
-
     private final boolean orderedResult;
 
     private final int partition;
@@ -63,7 +61,6 @@ public class DatasetPartitionWriter implements IFrameWriter {
         this.manager = manager;
         this.jobId = jobId;
         this.resultSetId = rsId;
-        this.asyncMode = asyncMode;
         this.orderedResult = orderedResult;
         this.partition = partition;
         this.nPartitions = nPartitions;
@@ -139,5 +136,4 @@ public class DatasetPartitionWriter implements IFrameWriter {
             }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index ef4c552..a1f4f5f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -118,7 +118,7 @@ public class MaterializedPartitionInputChannel implements IInputChannel {
                 ByteBuffer destFrame = emptyQueue.poll();
                 buffer.position(0);
                 buffer.limit(buffer.capacity());
-                if (destFrame.capacity() < buffer.capacity()){
+                if (destFrame.capacity() < buffer.capacity()) {
                     throw new HyracksDataException("should never happen");
                 }
                 destFrame.clear();
@@ -137,5 +137,10 @@ public class MaterializedPartitionInputChannel implements IInputChannel {
         public void close() throws HyracksDataException {
             monitor.notifyEndOfStream(MaterializedPartitionInputChannel.this);
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+            // materialize operators should only send their output once all of their input has been consumed. hence, this is a no op
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index 291064d..09d62c5 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -103,4 +103,9 @@ public class MaterializedPartitionWriter implements IFrameWriter {
 
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // materialize writer is kind of a sink operator, hence, flush() is a no op.
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 07b5c53..40b719b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -60,6 +60,8 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
 
     private boolean failed;
 
+    protected boolean flushRequest;
+
     public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
             TaskAttemptId taId, Executor executor) {
         this.ctx = ctx;
@@ -93,8 +95,9 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
                             MaterializingPipelinedPartition.this.wait();
                         }
                     }
-                    IFileHandle fh = fRef == null ? null : ioManager.open(fRef,
-                            IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    IFileHandle fh = fRef == null ? null
+                            : ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
+                                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
                     try {
                         writer.open();
                         try {
@@ -103,9 +106,14 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
                                 ByteBuffer buffer = ctx.allocateFrame();
                                 boolean fail = false;
                                 boolean done = false;
+                                boolean flush = false;
                                 while (!fail && !done) {
                                     synchronized (MaterializingPipelinedPartition.this) {
-                                        while (offset >= size && !eos && !failed) {
+                                        if (flushRequest) {
+                                            flushRequest = false;
+                                            flush = true;
+                                        }
+                                        while (offset >= size && !eos && !failed && !flush) {
                                             try {
                                                 MaterializingPipelinedPartition.this.wait();
                                             } catch (InterruptedException e) {
@@ -126,6 +134,10 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
                                         offset += readLen;
                                         buffer.flip();
                                         writer.nextFrame(buffer);
+                                        if (flush) {
+                                            writer.flush();
+                                            flush = false;
+                                        }
                                     }
                                 }
                             }
@@ -195,4 +207,10 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
             notifyAll();
         }
     }
+
+    @Override
+    public synchronized void flush() throws HyracksDataException {
+        flushRequest = true;
+        notifyAll();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
index 2ca1e0f..b83e0f0 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -80,8 +80,8 @@ public class PipelinedPartition implements IFrameWriter, IPartition {
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        ensureConnected();
         if (!failed) {
+            ensureConnected();
             delegate.nextFrame(buffer);
         }
     }
@@ -117,4 +117,12 @@ public class PipelinedPartition implements IFrameWriter, IPartition {
             delegate.close();
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        if (!failed) {
+            ensureConnected();
+            delegate.flush();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
index a46fa7b..9023120 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
@@ -69,4 +69,9 @@ public class ConnectorSenderProfilingFrameWriter implements IFrameWriter {
     public void fail() throws HyracksDataException {
         writer.fail();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
index 5e543fa..54ef732 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -80,10 +80,16 @@ public class ProfilingPartitionWriterFactory implements IPartitionWriterFactory
             @Override
             public void close() throws HyracksDataException {
                 closeTime = System.currentTimeMillis();
-                ((Task) ctx).setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext()
-                        .getJobId(), cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, mrep));
+                ((Task) ctx)
+                        .setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext().getJobId(),
+                                cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, mrep));
                 writer.close();
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                writer.flush();
+            }
         };
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index 7b18c3a..fd71716 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -47,11 +47,10 @@ public class AbstractFrameAppender implements IFrameAppender {
 
     protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
         return tupleDataEndOffset + FrameHelper.calcSpaceInFrame(fieldCount, tupleLength)
-                + tupleCount * FrameConstants.SIZE_LEN
-                <= FrameHelper.getTupleCountOffset(frame.getFrameSize());
+                + tupleCount * FrameConstants.SIZE_LEN <= FrameHelper.getTupleCountOffset(frame.getFrameSize());
     }
 
-    private void reset(ByteBuffer buffer, boolean clear) {
+    protected void reset(ByteBuffer buffer, boolean clear) {
         array = buffer.array();
         if (clear) {
             IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), 0);
@@ -59,9 +58,8 @@ public class AbstractFrameAppender implements IFrameAppender {
             tupleDataEndOffset = FrameConstants.TUPLE_START_OFFSET;
         } else {
             tupleCount = IntSerDeUtils.getInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()));
-            tupleDataEndOffset = tupleCount == 0 ?
-                    FrameConstants.TUPLE_START_OFFSET :
-                    IntSerDeUtils.getInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize())
+            tupleDataEndOffset = tupleCount == 0 ? FrameConstants.TUPLE_START_OFFSET
+                    : IntSerDeUtils.getInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize())
                             - tupleCount * FrameConstants.SIZE_LEN);
         }
     }
@@ -77,7 +75,7 @@ public class AbstractFrameAppender implements IFrameAppender {
     }
 
     @Override
-    public void flush(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+    public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
         getBuffer().clear();
         if (getTupleCount() > 0) {
             outWriter.nextFrame(getBuffer());

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
index 7d9d643..3657b7e 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
@@ -58,4 +58,9 @@ public class FrameDeserializingDataWriter implements IFrameWriter {
     @Override
     public void fail() {
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
index 996f296..21a7a71 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
@@ -50,8 +50,8 @@ public class FrameFixedFieldAppender extends AbstractFrameAppender implements IF
     }
 
     @Override
-    public void flush(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
-        super.flush(outWriter, clearFrame);
+    public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        super.write(outWriter, clearFrame);
         if (clearFrame) {
             if (leftOverSize > 0) {
                 if (!canHoldNewTuple(0, leftOverSize)) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldTupleAppender.java
index 616b8ca..9b9e803 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldTupleAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldTupleAppender.java
@@ -142,7 +142,7 @@ public class FrameFixedFieldTupleAppender implements IFrameTupleAppender, IFrame
     }
 
     @Override
-    public void flush(IFrameWriter outWriter, boolean clear) throws HyracksDataException {
-        lastAppender.flush(outWriter, clear);
+    public void write(IFrameWriter outWriter, boolean clear) throws HyracksDataException {
+        lastAppender.write(outWriter, clear);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameOutputStream.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameOutputStream.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameOutputStream.java
index 509fe6d..e8f826f 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameOutputStream.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameOutputStream.java
@@ -58,6 +58,6 @@ public class FrameOutputStream extends ByteArrayAccessibleOutputStream {
     }
 
     public void flush(IFrameWriter writer) throws HyracksDataException {
-        frameTupleAppender.flush(writer, true);
+        frameTupleAppender.write(writer, true);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
index 847deb8..77495dd 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
@@ -46,8 +46,12 @@ public class FrameTupleAppenderWrapper {
         outputWriter.open();
     }
 
+    public void write() throws HyracksDataException {
+        frameTupleAppender.write(outputWriter, true);
+    }
+
     public void flush() throws HyracksDataException {
-        frameTupleAppender.flush(outputWriter, true);
+        frameTupleAppender.flush(outputWriter);
     }
 
     public void close() throws HyracksDataException {
@@ -64,8 +68,7 @@ public class FrameTupleAppenderWrapper {
 
     public void appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length)
             throws HyracksDataException {
-        FrameUtils.appendSkipEmptyFieldToWriter(outputWriter, frameTupleAppender,
-                fieldSlots, bytes, offset, length);
+        FrameUtils.appendSkipEmptyFieldToWriter(outputWriter, frameTupleAppender, fieldSlots, bytes, offset, length);
     }
 
     public void append(byte[] bytes, int offset, int length) throws HyracksDataException {
@@ -88,8 +91,8 @@ public class FrameTupleAppenderWrapper {
 
     public void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
             int dataLen1) throws HyracksDataException {
-        FrameUtils.appendConcatToWriter(outputWriter, frameTupleAppender, accessor0, tIndex0,
-                fieldSlots1, bytes1, offset1, dataLen1);
+        FrameUtils.appendConcatToWriter(outputWriter, frameTupleAppender, accessor0, tIndex0, fieldSlots1, bytes1,
+                offset1, dataLen1);
     }
 
     public void appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
index 918ebd9..365b01c 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
@@ -62,11 +62,12 @@ public class SerializingDataWriter implements IOpenableDataWriter<Object[]> {
         if (!open) {
             throw new HyracksDataException("Closing SerializingDataWriter that has not been opened");
         }
-        tupleAppender.flush(frameWriter, true);
+        tupleAppender.write(frameWriter, true);
         frameWriter.close();
         open = false;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void writeData(Object[] data) throws HyracksDataException {
         if (!open) {
@@ -88,4 +89,9 @@ public class SerializingDataWriter implements IOpenableDataWriter<Object[]> {
     public void fail() throws HyracksDataException {
         frameWriter.fail();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        tupleAppender.flush(frameWriter);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/util/FrameUtils.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/util/FrameUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/util/FrameUtils.java
index 56eef43..5aad232 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/util/FrameUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/util/FrameUtils.java
@@ -65,7 +65,7 @@ public class FrameUtils {
         int flushedBytes = 0;
         if (!frameTupleAppender.appendSkipEmptyField(fieldSlots, bytes, offset, length)) {
             flushedBytes = frameTupleAppender.getBuffer().capacity();
-            frameTupleAppender.flush(writer, true);
+            frameTupleAppender.write(writer, true);
             if (!frameTupleAppender.appendSkipEmptyField(fieldSlots, bytes, offset, length)) {
                 throw new HyracksDataException("The output cannot be fit into a frame.");
             }
@@ -91,7 +91,7 @@ public class FrameUtils {
         int flushedBytes = 0;
         if (!frameTupleAppender.append(bytes, offset, length)) {
             flushedBytes = frameTupleAppender.getBuffer().capacity();
-            frameTupleAppender.flush(writer, true);
+            frameTupleAppender.write(writer, true);
             if (!frameTupleAppender.append(bytes, offset, length)) {
                 throw new HyracksDataException("The output cannot be fit into a frame.");
             }
@@ -114,7 +114,7 @@ public class FrameUtils {
         int flushedBytes = 0;
         if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
             flushedBytes = frameTupleAppender.getBuffer().capacity();
-            frameTupleAppender.flush(writer, true);
+            frameTupleAppender.write(writer, true);
             if (!frameTupleAppender.append(tupleAccessor, tStartOffset, tEndOffset)) {
                 throw new HyracksDataException("The output cannot be fit into a frame.");
             }
@@ -135,7 +135,7 @@ public class FrameUtils {
         int flushedBytes = 0;
         if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
             flushedBytes = frameTupleAppender.getBuffer().capacity();
-            frameTupleAppender.flush(writer, true);
+            frameTupleAppender.write(writer, true);
             if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
                 throw new HyracksDataException("The output cannot be fit into a frame.");
             }
@@ -159,7 +159,7 @@ public class FrameUtils {
         if (!tupleAppender.append(fieldEndOffsets, byteArray, start, size)) {
 
             flushedBytes = tupleAppender.getBuffer().capacity();
-            tupleAppender.flush(writer, true);
+            tupleAppender.write(writer, true);
 
             if (!tupleAppender.append(fieldEndOffsets, byteArray, start, size)) {
                 throw new HyracksDataException("The output cannot be fit into a frame.");
@@ -184,7 +184,7 @@ public class FrameUtils {
         int flushedBytes = 0;
         if (!frameTupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1)) {
             flushedBytes = frameTupleAppender.getBuffer().capacity();
-            frameTupleAppender.flush(writer, true);
+            frameTupleAppender.write(writer, true);
             if (!frameTupleAppender.appendConcat(accessor0, tIndex0, accessor1, tIndex1)) {
                 throw new HyracksDataException("The output cannot be fit into a frame.");
             }
@@ -210,7 +210,7 @@ public class FrameUtils {
         int flushedBytes = 0;
         if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
             flushedBytes = frameTupleAppender.getBuffer().capacity();
-            frameTupleAppender.flush(writer, true);
+            frameTupleAppender.write(writer, true);
             if (!frameTupleAppender.appendConcat(accessor0, tIndex0, fieldSlots1, bytes1, offset1, dataLen1)) {
                 throw new HyracksDataException("The output cannot be fit into a frame.");
             }
@@ -232,7 +232,7 @@ public class FrameUtils {
         int flushedBytes = 0;
         if (!frameTupleAppender.appendProjection(accessor, tIndex, fields)) {
             flushedBytes = frameTupleAppender.getBuffer().capacity();
-            frameTupleAppender.flush(writer, true);
+            frameTupleAppender.write(writer, true);
             if (!frameTupleAppender.appendProjection(accessor, tIndex, fields)) {
                 throw new HyracksDataException("The output cannot be fit into a frame.");
             }
@@ -254,7 +254,7 @@ public class FrameUtils {
         int flushedBytes = 0;
         if (!appender.appendField(array, start, length)) {
             flushedBytes = appender.getBuffer().capacity();
-            appender.flush(writer, true);
+            appender.write(writer, true);
             if (!appender.appendField(array, start, length)) {
                 throw new HyracksDataException("Could not write frame: the size of the tuple is too long");
             }
@@ -276,7 +276,7 @@ public class FrameUtils {
         int flushedBytes = 0;
         if (!appender.appendField(accessor, tid, fid)) {
             flushedBytes = appender.getBuffer().capacity();
-            appender.flush(writer, true);
+            appender.write(writer, true);
             if (!appender.appendField(accessor, tid, fid)) {
                 throw new HyracksDataException("Could not write frame: the size of the tuple is too long");
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
index 2af40fd..722e9dc 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
@@ -86,4 +86,9 @@ public class RunFileWriter implements IFrameWriter {
         }
         return new RunFileReader(file, ioManager, size, true);
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // this is a kind of a sink operator and hence, flush() is a no op
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java b/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
index 05710ad..699674e 100644
--- a/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
+++ b/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/comm/io/largeobject/FrameFixedFieldTupleAppenderTest.java
@@ -26,9 +26,6 @@ import static org.junit.Assert.assertTrue;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameTupleAppender;
@@ -44,6 +41,8 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.junit.Before;
+import org.junit.Test;
 
 public class FrameFixedFieldTupleAppenderTest {
 
@@ -51,22 +50,19 @@ public class FrameFixedFieldTupleAppenderTest {
     static final int TEST_FRAME_SIZE = 256;
 
     FrameFixedFieldAppender appender;
-    static ISerializerDeserializer[] fields = new ISerializerDeserializer[] {
-            IntegerSerializerDeserializer.INSTANCE,
-            new UTF8StringSerializerDeserializer(),
-            IntegerSerializerDeserializer.INSTANCE,
-            new UTF8StringSerializerDeserializer(),
-    };
+    static ISerializerDeserializer[] fields = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+            new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE,
+            new UTF8StringSerializerDeserializer(), };
     static RecordDescriptor recordDescriptor = new RecordDescriptor(fields);
     static ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptor.getFieldCount());
 
-    class SequetialDataVerifier implements IFrameWriter {
+    class SequentialDataVerifier implements IFrameWriter {
 
         private final IFrameTupleAccessor accessor;
         private IFrameTupleAccessor innerAccessor;
         private int tid;
 
-        public SequetialDataVerifier(IFrameTupleAccessor accessor) {
+        public SequentialDataVerifier(IFrameTupleAccessor accessor) {
             this.accessor = accessor;
             this.innerAccessor = new FrameTupleAccessor(recordDescriptor);
         }
@@ -87,7 +83,8 @@ public class FrameFixedFieldTupleAppenderTest {
         private void validate(IFrameTupleAccessor innerAccessor, int i) {
             assertTrue(tid < accessor.getTupleCount());
             assertEquals(accessor.getTupleLength(tid), innerAccessor.getTupleLength(i));
-            assertArrayEquals(Arrays.copyOfRange(accessor.getBuffer().array(), accessor.getTupleStartOffset(tid),
+            assertArrayEquals(
+                    Arrays.copyOfRange(accessor.getBuffer().array(), accessor.getTupleStartOffset(tid),
                             accessor.getTupleEndOffset(tid)),
                     Arrays.copyOfRange(innerAccessor.getBuffer().array(), innerAccessor.getTupleStartOffset(i),
                             innerAccessor.getTupleEndOffset(i)));
@@ -103,6 +100,10 @@ public class FrameFixedFieldTupleAppenderTest {
         public void close() throws HyracksDataException {
             assertEquals(accessor.getTupleCount(), tid);
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+        }
     }
 
     @Before
@@ -119,13 +120,13 @@ public class FrameFixedFieldTupleAppenderTest {
         for (int tid = 0; tid < accessor.getTupleCount(); tid++) {
             for (int fid = 0; fid < fields.length; fid++) {
                 if (!appender.appendField(accessor, tid, fid)) {
-                    appender.flush(writer, true);
+                    appender.write(writer, true);
                     if (!appender.appendField(accessor, tid, fid)) {
                     }
                 }
             }
         }
-        appender.flush(writer, true);
+        appender.write(writer, true);
         writer.close();
     }
 
@@ -143,7 +144,7 @@ public class FrameFixedFieldTupleAppenderTest {
     }
 
     private IFrameWriter prepareValidator(IFrameTupleAccessor accessor) throws HyracksDataException {
-        return new SequetialDataVerifier(accessor);
+        return new SequentialDataVerifier(accessor);
     }
 
     enum DATA_TYPE {
@@ -154,8 +155,8 @@ public class FrameFixedFieldTupleAppenderTest {
 
     private IFrameTupleAccessor prepareData(DATA_TYPE type) throws HyracksDataException {
         IFrameTupleAccessor accessor = new FrameTupleAccessor(recordDescriptor);
-        IFrameTupleAppender appender = new FrameTupleAppender(
-                new VSizeFrame(new FrameManager(INPUT_BUFFER_SIZE)), true);
+        IFrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(new FrameManager(INPUT_BUFFER_SIZE)),
+                true);
         int i = 0;
         do {
             switch (type) {
@@ -169,8 +170,8 @@ public class FrameFixedFieldTupleAppenderTest {
                     makeABigObjectTuple(tupleBuilder, i++);
                     break;
             }
-        } while (appender
-                .append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize()));
+        } while (appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                tupleBuilder.getSize()));
         accessor.reset(appender.getBuffer());
         return accessor;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
index 52b4abe..1aa48e5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputSinkOperatorNodePushable.java
@@ -20,10 +20,16 @@ package org.apache.hyracks.dataflow.std.base;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class AbstractUnaryInputSinkOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
     @Override
     public final void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         throw new IllegalStateException();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // Sink operator, nothing to flush
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
index ee8c656..830075c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
@@ -127,7 +127,7 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
         for (int i = 0; i < pWriters.length; ++i) {
             if (isWriterOpen[i]) {
                 try {
-                    appenders[i].flush(pWriters[i], true);
+                    appenders[i].write(pWriters[i], true);
                 } catch (Throwable th) {
                     if (closeException == null) {
                         closeException = new HyracksDataException(th);
@@ -151,4 +151,11 @@ public class LocalityAwarePartitionDataWriter implements IFrameWriter {
             throw closeException;
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        for (int i = 0; i < pWriters.length; ++i) {
+            appenders[i].flush(pWriters[i]);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 4d73fa5..7d60ce5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -113,6 +113,13 @@ public class MToNReplicatingConnectorDescriptor extends AbstractMToNConnectorDes
                     epWriters[i].open();
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                for (IFrameWriter writer : epWriters) {
+                    writer.flush();
+                }
+            }
         };
     }
 
@@ -126,4 +133,4 @@ public class MToNReplicatingConnectorDescriptor extends AbstractMToNConnectorDes
         NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
         return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/637e9552/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 336272c..646883f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -68,7 +68,7 @@ public class PartitionDataWriter implements IFrameWriter {
             if (isOpen[i]) {
                 if (allocatedFrame) {
                     try {
-                        appenders[i].flush(pWriters[i], true);
+                        appenders[i].write(pWriters[i], true);
                     } catch (Throwable th) {
                         if (closeException == null) {
                             closeException = new HyracksDataException(th);
@@ -112,7 +112,6 @@ public class PartitionDataWriter implements IFrameWriter {
         for (int i = 0; i < tupleCount; ++i) {
             int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
             FrameUtils.appendToWriter(pWriters[h], appenders[h], tupleAccessor, i);
-
         }
     }
 
@@ -142,4 +141,11 @@ public class PartitionDataWriter implements IFrameWriter {
             throw failException;
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        for (int i = 0; i < consumerPartitionCount; i++) {
+            appenders[i].flush(pWriters[i]);
+        }
+    }
 }