You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/02/18 08:47:33 UTC

incubator-asterixdb-hyracks git commit: Support Sending Messages Alongside Frame Data

Repository: incubator-asterixdb-hyracks
Updated Branches:
  refs/heads/master e00dbb909 -> c6f2d18df


Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Reviewed-on: https://asterix-gerrit.ics.uci.edu/604
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mi...@couchbase.com>
Reviewed-by: Murtadha Hubail <hu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/c6f2d18d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/c6f2d18d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/c6f2d18d

Branch: refs/heads/master
Commit: c6f2d18dfc52a2435ea9a319524b69b87af10f09
Parents: e00dbb9
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Wed Feb 17 15:23:01 2016 +0300
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Wed Feb 17 23:42:13 2016 -0800

----------------------------------------------------------------------
 .../apache/hyracks/api/comm/FrameHelper.java    |  4 +-
 .../api/context/IHyracksTaskContext.java        |  4 +
 .../api/dataflow/IConnectorDescriptor.java      |  8 +-
 .../org/apache/hyracks/control/nc/Task.java     | 12 +++
 .../common/comm/io/AbstractFrameAppender.java   | 14 ++-
 .../common/comm/io/FrameTupleAppender.java      | 96 ++++++++++----------
 .../common/io/MessagingFrameTupleAppender.java  | 75 +++++++++++++++
 .../dataflow/common/util/IntSerDeUtils.java     |  7 ++
 .../base/AbstractMToNConnectorDescriptor.java   |  2 +-
 ...wareMToNPartitioningConnectorDescriptor.java |  6 +-
 .../MToNPartitioningConnectorDescriptor.java    | 16 ++--
 ...titioningWithMessageConnectorDescriptor.java | 45 +++++++++
 .../connectors/OneToOneConnectorDescriptor.java | 10 +-
 .../std/connectors/PartitionDataWriter.java     |  6 +-
 .../PartitionWithMessageDataWriter.java         | 41 +++++++++
 .../file/DelimitedDataTupleParserFactory.java   |  4 +-
 .../dataflow/std/file/ITupleParserFactory.java  |  4 +-
 .../examples/text/WordTupleParserFactory.java   |  4 +-
 .../hyracks/test/support/TestTaskContext.java   |  9 ++
 19 files changed, 289 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
index eb6b888..473f3ae 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
@@ -54,7 +54,7 @@ public class FrameHelper {
      * nbytes the actual data.
      * If the tupleLength includes the field slot, please set the fieldCount = 0
      */
-    public static int calcSpaceInFrame(int fieldCount, int tupleLength) {
+    public static int calcRequiredSpace(int fieldCount, int tupleLength) {
         return 4 + fieldCount * 4 + tupleLength;
     }
 
@@ -68,7 +68,7 @@ public class FrameHelper {
      */
     public static int calcAlignedFrameSizeToStore(int fieldCount, int tupleLength, int minFrameSize) {
         assert fieldCount >= 0 && tupleLength >= 0 && minFrameSize > 0;
-        return (1 + (calcSpaceInFrame(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
+        return (1 + (calcRequiredSpace(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
                 * minFrameSize;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index fd1d376..274d446 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -41,4 +41,8 @@ public interface IHyracksTaskContext
     public IDatasetPartitionManager getDatasetPartitionManager();
 
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception;
+
+    public void setSharedObject(Object sharedObject);
+
+    public Object getSharedObject();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
index 81a0290..339eb9d 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -21,9 +21,6 @@ package org.apache.hyracks.api.dataflow;
 import java.io.Serializable;
 import java.util.BitSet;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.IPartitionCollector;
@@ -33,6 +30,8 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.ActivityCluster;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 /**
  * Connector that connects operators in a Job.
@@ -40,6 +39,7 @@ import org.apache.hyracks.api.job.ActivityCluster;
  * @author vinayakb
  */
 public interface IConnectorDescriptor extends Serializable {
+
     /**
      * Gets the id of the connector.
      *
@@ -68,7 +68,7 @@ public interface IConnectorDescriptor extends Serializable {
      */
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException;
+                    throws HyracksDataException;
 
     /**
      * Factory metod to create the receive side reader that reads data from this

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 61baf82..e99fea8 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -95,6 +95,8 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
 
+    private Object sharedObject;
+
     public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor,
             NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) {
         this.joblet = joblet;
@@ -383,4 +385,14 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
         this.ncs.sendApplicationMessageToCC(message, deploymentId);
     }
+
+    @Override
+    public void setSharedObject(Object sharedObject) {
+        this.sharedObject = sharedObject;
+    }
+
+    @Override
+    public Object getSharedObject() {
+        return sharedObject;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index fd71716..1553605 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -29,6 +29,18 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 
+/*
+ * Frame
+ *  _____________________________________________
+ * |[tuple1][tuple2][tuple3].........            |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |..[tupleN][tuplesOffsets(4*N)][tupleCount(4)]|
+ * |_____________________________________________|
+ */
 public class AbstractFrameAppender implements IFrameAppender {
     protected IFrame frame;
     protected byte[] array; // cached the getBuffer().array to speed up byte array access a little
@@ -46,7 +58,7 @@ public class AbstractFrameAppender implements IFrameAppender {
     }
 
     protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
-        return tupleDataEndOffset + FrameHelper.calcSpaceInFrame(fieldCount, tupleLength)
+        return tupleDataEndOffset + FrameHelper.calcRequiredSpace(fieldCount, tupleLength)
                 + tupleCount * FrameConstants.SIZE_LEN <= FrameHelper.getTupleCountOffset(frame.getFrameSize());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index c9c51d3..136e231 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -40,6 +40,10 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
         reset(frame, clear);
     }
 
+    /**
+     * append fieldSlots and bytes to the current frame
+     */
+    @Override
     public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
             for (int i = 0; i < fieldSlots.length; ++i) {
@@ -50,27 +54,28 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()),
+                    tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(byte[] bytes, int offset, int length) throws HyracksDataException {
         if (canHoldNewTuple(0, length)) {
             System.arraycopy(bytes, offset, getBuffer().array(), tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length)
             throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
@@ -83,17 +88,16 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
             }
             System.arraycopy(bytes, offset, array, tupleDataEndOffset + effectiveSlots * 4, length);
             tupleDataEndOffset += effectiveSlots * 4 + length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
             throws HyracksDataException {
         int length = tEndOffset - tStartOffset;
@@ -101,25 +105,25 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
             ByteBuffer src = tupleAccessor.getBuffer();
             System.arraycopy(src.array(), tStartOffset, array, tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
         int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
         int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex);
         return append(tupleAccessor, tStartOffset, tEndOffset);
     }
 
-    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1,
-            int tIndex1) throws HyracksDataException {
+    @Override
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
         int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
         int length0 = endOffset0 - startOffset0;
@@ -143,22 +147,22 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1,
             int offset1, int dataLen1) throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
@@ -176,21 +180,19 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
             System.arraycopy(src0.array(), startOffset0, array, tupleDataEndOffset, slotsLen0);
             // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
             for (int i = 0; i < fieldSlots1.length; ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4,
-                        (fieldSlots1[i] + dataLen0));
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy bytes1
-            System.arraycopy(bytes1, offset1, array,
-                    tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0, dataLen1);
+            System.arraycopy(bytes1, offset1, array, tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0,
+                    dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
@@ -219,22 +221,21 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy bytes0
-            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
-                    dataLen0);
+            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1, dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields)
             throws HyracksDataException {
         int fTargetSlotsLength = fields.length * 4;
@@ -253,18 +254,17 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
                 int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]);
                 int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
                         - accessor.getFieldStartOffset(tIndex, fields[i]);
-                System.arraycopy(accessor.getBuffer().array(), fSrcStart, array, tupleDataEndOffset
-                        + fTargetSlotsLength + fStartOffset, fLen);
+                System.arraycopy(accessor.getBuffer().array(), fSrcStart, array,
+                        tupleDataEndOffset + fTargetSlotsLength + fStartOffset, fLen);
                 fEndOffset += fLen;
                 IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fEndOffset);
                 fStartOffset = fEndOffset;
             }
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
new file mode 100644
index 0000000..7100c11
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class MessagingFrameTupleAppender extends FrameTupleAppender {
+
+    public static final int MAX_MESSAGE_SIZE = 100;
+    private final IHyracksTaskContext ctx;
+
+    public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
+        this.ctx = ctx;
+    }
+
+    @Override
+    protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
+        if (hasEnoughSpace(fieldCount, dataLength + MAX_MESSAGE_SIZE)) {
+            return true;
+        }
+        if (tupleCount == 0) {
+            frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, dataLength + MAX_MESSAGE_SIZE,
+                    frame.getMinSize()));
+            reset(frame.getBuffer(), true);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        if (tupleCount > 0) {
+            appendMessage();
+            getBuffer().clear();
+            outWriter.nextFrame(getBuffer());
+            if (clearFrame) {
+                frame.reset();
+                reset(getBuffer(), true);
+            }
+        }
+    }
+
+    public void appendMessage() {
+        ByteBuffer message = (ByteBuffer) ctx.getSharedObject();
+        System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, message.limit());
+        tupleDataEndOffset += message.limit();
+        IntSerDeUtils.putInt(getBuffer().array(),
+                FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+        ++tupleCount;
+        IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
index b808ac1..d2100bb 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
@@ -26,7 +26,14 @@ public class IntSerDeUtils {
                 + ((bytes[offset + 3] & 0xff) << 0);
     }
 
+    /**
+     * put integer value into the array bytes at the offset offset
+     * @param bytes byte array to put data in
+     * @param offset offset from the beginning of the array to write the {@code value} in
+     * @param value value to write to {@code bytes[offset]}
+     */
     public static void putInt(byte[] bytes, int offset, int value) {
+
         bytes[offset++] = (byte) (value >> 24);
         bytes[offset++] = (byte) (value >> 16);
         bytes[offset++] = (byte) (value >> 8);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
index cf4808f..7d97507 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
@@ -44,7 +44,7 @@ public abstract class AbstractMToNConnectorDescriptor extends AbstractConnectorD
     }
 
     @Override
-    public boolean allProducersToAllConsumers(){
+    public boolean allProducersToAllConsumers() {
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
index 7856d6a..44d77ac 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
@@ -41,8 +41,8 @@ public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMT
 
     private ITuplePartitionComputerFactory tpcf;
 
-    public LocalityAwareMToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf,
-            ILocalityMap localityMap) {
+    public LocalityAwareMToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf, ILocalityMap localityMap) {
         super(spec);
         this.localityMap = localityMap;
         this.tpcf = tpcf;
@@ -60,7 +60,7 @@ public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMT
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(),
                 nConsumerPartitions, localityMap, index);
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 22a4c1c..d5e4e20 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -35,7 +35,7 @@ import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
 
 public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
-    private ITuplePartitionComputerFactory tpcf;
+    protected ITuplePartitionComputerFactory tpcf;
 
     public MToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf) {
         super(spec);
@@ -45,15 +45,13 @@ public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDe
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
-        final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory,
-                recordDesc, tpcf.createPartitioner());
-        return hashWriter;
+                    throws HyracksDataException {
+        return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
@@ -61,4 +59,8 @@ public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDe
         NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
         return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
     }
+
+    public ITuplePartitionComputerFactory getTuplePartitionComputerFactory() {
+        return tpcf;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
new file mode 100644
index 0000000..e90d8b0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+
+public class MToNPartitioningWithMessageConnectorDescriptor extends MToNPartitioningConnectorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public MToNPartitioningWithMessageConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf) {
+        super(spec, tpcf);
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+                    throws HyracksDataException {
+        return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
+                tpcf.createPartitioner());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index cfa0cf9..dde29c1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -48,13 +48,13 @@ public class OneToOneConnectorDescriptor extends AbstractConnectorDescriptor {
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return edwFactory.createFrameWriter(index);
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(index);
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
@@ -69,8 +69,8 @@ public class OneToOneConnectorDescriptor extends AbstractConnectorDescriptor {
         OperatorDescriptorId consumer = ac.getConsumerActivity(getConnectorId()).getOperatorDescriptorId();
         OperatorDescriptorId producer = ac.getProducerActivity(getConnectorId()).getOperatorDescriptorId();
 
-        constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(consumer),
-                new PartitionCountExpression(producer)));
+        constraintAcceptor.addConstraint(
+                new Constraint(new PartitionCountExpression(consumer), new PartitionCountExpression(producer)));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 646883f..f84c3e4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -51,7 +51,7 @@ public class PartitionDataWriter implements IFrameWriter {
         for (int i = 0; i < consumerPartitionCount; ++i) {
             try {
                 pWriters[i] = pwFactory.createFrameWriter(i);
-                appenders[i] = new FrameTupleAppender();
+                appenders[i] = createTupleAppender(ctx);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -61,6 +61,10 @@ public class PartitionDataWriter implements IFrameWriter {
         this.ctx = ctx;
     }
 
+    protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
+        return new FrameTupleAppender();
+    }
+
     @Override
     public void close() throws HyracksDataException {
         HyracksDataException closeException = null;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
new file mode 100644
index 0000000..4055fb0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+
+public class PartitionWithMessageDataWriter extends PartitionDataWriter {
+
+    public PartitionWithMessageDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc)
+                    throws HyracksDataException {
+        super(ctx, consumerPartitionCount, pwFactory, recordDescriptor, tpc);
+    }
+
+    @Override
+    protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
+        return new MessagingFrameTupleAppender(ctx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 0960927..d121ec4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -26,7 +26,7 @@ import java.io.InputStreamReader;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -52,7 +52,7 @@ public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
     }
 
     @Override
-    public ITupleParser createTupleParser(final IHyracksCommonContext ctx) {
+    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
         return new ITupleParser() {
             @Override
             public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
index daf5104..f495b75 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
@@ -20,9 +20,9 @@ package org.apache.hyracks.dataflow.std.file;
 
 import java.io.Serializable;
 
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ITupleParserFactory extends Serializable {
-    public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException;
+    public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
index eb2714f..4558cf9 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
+++ b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
@@ -27,7 +27,7 @@ import java.util.Arrays;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -41,7 +41,7 @@ public class WordTupleParserFactory implements ITupleParserFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public ITupleParser createTupleParser(final IHyracksCommonContext ctx) {
+    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
         return new ITupleParser() {
             @Override
             public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c6f2d18d/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 6d954eb..9848ffb 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -137,4 +137,13 @@ public class TestTaskContext implements IHyracksTaskContext {
     public ExecutorService getExecutorService() {
         return null;
     }
+
+    @Override
+    public Object getSharedObject() {
+        return null;
+    }
+
+    @Override
+    public void setSharedObject(Object sharedObject) {
+    }
 }
\ No newline at end of file