You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org> on 2016/01/31 16:45:13 UTC

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/604

Change subject: Support Sending Messages Alongside Frame Data
......................................................................

Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
---
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
A algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangeWithMessagePOperator.java
M algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
A hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java.orig
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
A hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
M hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
19 files changed, 608 insertions(+), 73 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/04/604/1

diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
index b95d279..f4dbee7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -51,14 +50,21 @@
     private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
 
     private final boolean bulkload;
+    private final boolean feed;
 
     public InsertDeleteOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
             List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, boolean bulkload) {
+        this(dataSource, payloadExpr, primaryKeyExprs, operation, bulkload, false);
+    }
+
+    public InsertDeleteOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
+            List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, boolean bulkload, boolean feed) {
         this.dataSource = dataSource;
         this.payloadExpr = payloadExpr;
         this.primaryKeyExprs = primaryKeyExprs;
         this.operation = operation;
         this.bulkload = bulkload;
+        this.feed = feed;
     }
 
     @Override
@@ -68,7 +74,8 @@
     }
 
     @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
         boolean changed = false;
         changed = transform.transform(payloadExpr);
         for (Mutable<ILogicalExpression> e : primaryKeyExprs) {
@@ -120,7 +127,7 @@
 
     public boolean isBulkload() {
         return bulkload;
-	}
+    }
 
     public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
         this.additionalFilteringExpressions = additionalFilteringExpressions;
@@ -130,4 +137,8 @@
         return additionalFilteringExpressions;
     }
 
+    public boolean isFeed() {
+        return feed;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 0ff1e47..0bfc417 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -47,8 +47,8 @@
 
 public class HashPartitionExchangePOperator extends AbstractExchangePOperator {
 
-    private List<LogicalVariable> hashFields;
-    private INodeDomain domain;
+    protected List<LogicalVariable> hashFields;
+    protected INodeDomain domain;
 
     public HashPartitionExchangePOperator(List<LogicalVariable> hashFields, INodeDomain domain) {
         this.hashFields = hashFields;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangeWithMessagePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangeWithMessagePOperator.java
new file mode 100644
index 0000000..4ddf4a9
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangeWithMessagePOperator.java
@@ -0,0 +1,45 @@
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
+
+public class HashPartitionExchangeWithMessagePOperator extends HashPartitionExchangePOperator {
+
+    public HashPartitionExchangeWithMessagePOperator(List<LogicalVariable> hashFields, INodeDomain domain) {
+        super(hashFields, domain);
+    }
+
+    @Override
+    public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+        int[] keys = new int[hashFields.size()];
+        IBinaryHashFunctionFactory[] hashFunctionFactories = new IBinaryHashFunctionFactory[hashFields.size()];
+        int i = 0;
+        IBinaryHashFunctionFactoryProvider hashFunProvider = context.getBinaryHashFunctionFactoryProvider();
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        for (LogicalVariable v : hashFields) {
+            keys[i] = opSchema.findVariable(v);
+            hashFunctionFactories[i] = hashFunProvider.getBinaryHashFunctionFactory(env.getVarType(v));
+            ++i;
+        }
+        ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
+        IConnectorDescriptor conn = new MToNPartitioningWithMessageConnectorDescriptor(spec, tpcf);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 8bf1ad5..d105af4 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -46,6 +46,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
@@ -54,6 +55,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangeWithMessagePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
@@ -570,7 +572,11 @@
                         }
                     }
                     if (!propWasSet) {
-                        pop = new HashPartitionExchangePOperator(varList, domain);
+                        if (op instanceof InsertDeleteOperator && ((InsertDeleteOperator) op).isFeed()) {
+                            pop = new HashPartitionExchangeWithMessagePOperator(varList, domain);
+                        } else {
+                            pop = new HashPartitionExchangePOperator(varList, domain);
+                        }
                     }
                     break;
                 }
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
index eb6b888..473f3ae 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
@@ -54,7 +54,7 @@
      * nbytes the actual data.
      * If the tupleLength includes the field slot, please set the fieldCount = 0
      */
-    public static int calcSpaceInFrame(int fieldCount, int tupleLength) {
+    public static int calcRequiredSpace(int fieldCount, int tupleLength) {
         return 4 + fieldCount * 4 + tupleLength;
     }
 
@@ -68,7 +68,7 @@
      */
     public static int calcAlignedFrameSizeToStore(int fieldCount, int tupleLength, int minFrameSize) {
         assert fieldCount >= 0 && tupleLength >= 0 && minFrameSize > 0;
-        return (1 + (calcSpaceInFrame(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
+        return (1 + (calcRequiredSpace(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
                 * minFrameSize;
     }
 
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index fd1d376..3b48bfd 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.api.context;
 
+import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -41,4 +42,12 @@
     public IDatasetPartitionManager getDatasetPartitionManager();
 
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception;
+
+    public void setSharedObject(HashMap<String, Object> sharedObject);
+
+    public HashMap<String, Object> getSharedObject();
+
+    public void setObject(String name, Object object);
+
+    public Object getObject(String name);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
index ba71b0c..9e9a960 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -76,7 +76,7 @@
             }
         }
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Result state cleanup instance successfully completed.");
+            //LOGGER.info("Result state cleanup instance successfully completed.");
         }
     }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 61baf82..3518601 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -95,6 +95,8 @@
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
 
+    private HashMap<String, Object> taskSharedObject;
+
     public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor,
             NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) {
         this.joblet = joblet;
@@ -383,4 +385,27 @@
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
         this.ncs.sendApplicationMessageToCC(message, deploymentId);
     }
+
+    @Override
+    public void setObject(String name, Object object) {
+        if (taskSharedObject == null) {
+            taskSharedObject = new HashMap<>();
+        }
+        taskSharedObject.put(name, object);
+    }
+
+    @Override
+    public Object getObject(String name) {
+        return taskSharedObject.get(name);
+    }
+
+    @Override
+    public void setSharedObject(HashMap<String, Object> sharedObject) {
+        this.taskSharedObject = sharedObject;
+    }
+
+    @Override
+    public HashMap<String, Object> getSharedObject() {
+        return taskSharedObject;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index fd71716..1553605 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -29,6 +29,18 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 
+/*
+ * Frame
+ *  _____________________________________________
+ * |[tuple1][tuple2][tuple3].........            |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |..[tupleN][tuplesOffsets(4*N)][tupleCount(4)]|
+ * |_____________________________________________|
+ */
 public class AbstractFrameAppender implements IFrameAppender {
     protected IFrame frame;
     protected byte[] array; // cached the getBuffer().array to speed up byte array access a little
@@ -46,7 +58,7 @@
     }
 
     protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
-        return tupleDataEndOffset + FrameHelper.calcSpaceInFrame(fieldCount, tupleLength)
+        return tupleDataEndOffset + FrameHelper.calcRequiredSpace(fieldCount, tupleLength)
                 + tupleCount * FrameConstants.SIZE_LEN <= FrameHelper.getTupleCountOffset(frame.getFrameSize());
     }
 
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java.orig b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java.orig
new file mode 100644
index 0000000..4c1626a
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java.orig
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.comm.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.FrameConstants;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+
+/*
+ * Frame
+ *  _____________________________________________
+ * |[tuple1][tuple2][tuple3].........            |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |..[tupleN][tuplesOffsets(4*N)][tupleCount(4)]|
+ * |_____________________________________________|
+ */
+public class AbstractFrameAppender implements IFrameAppender {
+    protected IFrame frame;
+    protected byte[] array; // cached the getBuffer().array to speed up byte array access a little
+
+    protected int tupleCount;
+    protected int tupleDataEndOffset;
+
+    @Override
+    public void reset(IFrame frame, boolean clear) throws HyracksDataException {
+        this.frame = frame;
+        if (clear) {
+            this.frame.reset();
+        }
+        reset(getBuffer(), clear);
+    }
+
+    protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
+<<<<<<< HEAD
+        return tupleDataEndOffset + FrameHelper.calcSpaceInFrame(fieldCount, tupleLength)
+=======
+        return tupleDataEndOffset + FrameHelper.calcRequiredSpace(fieldCount, tupleLength)
+>>>>>>> Add Feed Messages to Frames
+                + tupleCount * FrameConstants.SIZE_LEN <= FrameHelper.getTupleCountOffset(frame.getFrameSize());
+    }
+
+    protected void reset(ByteBuffer buffer, boolean clear) {
+        array = buffer.array();
+        if (clear) {
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), 0);
+            tupleCount = 0;
+            tupleDataEndOffset = FrameConstants.TUPLE_START_OFFSET;
+        } else {
+            tupleCount = IntSerDeUtils.getInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()));
+            tupleDataEndOffset = tupleCount == 0 ? FrameConstants.TUPLE_START_OFFSET
+                    : IntSerDeUtils.getInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize())
+                            - tupleCount * FrameConstants.SIZE_LEN);
+        }
+    }
+
+    @Override
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return frame.getBuffer();
+    }
+
+    @Override
+    public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        getBuffer().clear();
+        if (getTupleCount() > 0) {
+            outWriter.nextFrame(getBuffer());
+        }
+        if (clearFrame) {
+            frame.reset();
+            reset(getBuffer(), true);
+        }
+    }
+
+    protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
+        if (hasEnoughSpace(fieldCount, dataLength)) {
+            return true;
+        }
+        if (tupleCount == 0) {
+            frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, dataLength, frame.getMinSize()));
+            reset(frame.getBuffer(), true);
+            return true;
+        }
+        return false;
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index c9c51d3..136e231 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -40,6 +40,10 @@
         reset(frame, clear);
     }
 
+    /**
+     * append fieldSlots and bytes to the current frame
+     */
+    @Override
     public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
             for (int i = 0; i < fieldSlots.length; ++i) {
@@ -50,27 +54,28 @@
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()),
+                    tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(byte[] bytes, int offset, int length) throws HyracksDataException {
         if (canHoldNewTuple(0, length)) {
             System.arraycopy(bytes, offset, getBuffer().array(), tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length)
             throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
@@ -83,17 +88,16 @@
             }
             System.arraycopy(bytes, offset, array, tupleDataEndOffset + effectiveSlots * 4, length);
             tupleDataEndOffset += effectiveSlots * 4 + length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
             throws HyracksDataException {
         int length = tEndOffset - tStartOffset;
@@ -101,25 +105,25 @@
             ByteBuffer src = tupleAccessor.getBuffer();
             System.arraycopy(src.array(), tStartOffset, array, tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
         int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
         int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex);
         return append(tupleAccessor, tStartOffset, tEndOffset);
     }
 
-    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1,
-            int tIndex1) throws HyracksDataException {
+    @Override
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
         int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
         int length0 = endOffset0 - startOffset0;
@@ -143,22 +147,22 @@
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1,
             int offset1, int dataLen1) throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
@@ -176,21 +180,19 @@
             System.arraycopy(src0.array(), startOffset0, array, tupleDataEndOffset, slotsLen0);
             // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
             for (int i = 0; i < fieldSlots1.length; ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4,
-                        (fieldSlots1[i] + dataLen0));
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy bytes1
-            System.arraycopy(bytes1, offset1, array,
-                    tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0, dataLen1);
+            System.arraycopy(bytes1, offset1, array, tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0,
+                    dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
@@ -219,22 +221,21 @@
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy bytes0
-            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
-                    dataLen0);
+            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1, dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields)
             throws HyracksDataException {
         int fTargetSlotsLength = fields.length * 4;
@@ -253,18 +254,17 @@
                 int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]);
                 int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
                         - accessor.getFieldStartOffset(tIndex, fields[i]);
-                System.arraycopy(accessor.getBuffer().array(), fSrcStart, array, tupleDataEndOffset
-                        + fTargetSlotsLength + fStartOffset, fLen);
+                System.arraycopy(accessor.getBuffer().array(), fSrcStart, array,
+                        tupleDataEndOffset + fTargetSlotsLength + fStartOffset, fLen);
                 fEndOffset += fLen;
                 IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fEndOffset);
                 fStartOffset = fEndOffset;
             }
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
new file mode 100644
index 0000000..ca916f5
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.io;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class MessagingFrameTupleAppender extends FrameTupleAppender {
+    public static final String KEY_MESSAGE = "message";
+    public static final String KEY_LENGTH = "length";
+    public static final String KEY_SLOTS = "slots";
+    public static final int MAX_MESSAGE_SIZE = 100;
+    private IHyracksTaskContext ctx;
+
+    public MessagingFrameTupleAppender(IHyracksTaskContext ctx) throws HyracksDataException {
+        this.ctx = ctx;
+    }
+
+    public MessagingFrameTupleAppender(IFrame frame, IHyracksTaskContext ctx) throws HyracksDataException {
+        super(frame);
+        this.ctx = ctx;
+    }
+
+    @Override
+    protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
+        if (hasEnoughSpace(fieldCount, dataLength + MAX_MESSAGE_SIZE)) {
+            return true;
+        }
+        if (tupleCount == 0) {
+            frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, dataLength + MAX_MESSAGE_SIZE,
+                    frame.getMinSize()));
+            reset(frame.getBuffer(), true);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        if (getTupleCount() > 0) {
+            appendMessage();
+            getBuffer().clear();
+            outWriter.nextFrame(getBuffer());
+        }
+        if (clearFrame) {
+            frame.reset();
+            reset(getBuffer(), true);
+        }
+    }
+
+    public void appendMessage() {
+        byte[] message = (byte[]) ctx.getObject(KEY_MESSAGE);
+        int length = ((MutableInt) ctx.getObject(KEY_LENGTH)).intValue();
+        int[] slots = (int[]) ctx.getObject(KEY_SLOTS);
+        for (int i = 0; i < slots.length; ++i) {
+            IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, slots[i]);
+        }
+        System.arraycopy(message, 0, array, tupleDataEndOffset + slots.length * 4, length);
+        tupleDataEndOffset += slots.length * 4 + length;
+        IntSerDeUtils.putInt(getBuffer().array(),
+                FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+        ++tupleCount;
+        IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+    }
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
index b808ac1..b01eb15 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
@@ -26,7 +26,14 @@
                 + ((bytes[offset + 3] & 0xff) << 0);
     }
 
+    /**
+     * put integer value into the array bytes at the offset offset
+     * @param bytes byte array to put data in
+     * @param offset offset from the beginning of the array to write the {@code value} in 
+     * @param value value to write to {@code bytes[offset]}
+     */
     public static void putInt(byte[] bytes, int offset, int value) {
+
         bytes[offset++] = (byte) (value >> 24);
         bytes[offset++] = (byte) (value >> 16);
         bytes[offset++] = (byte) (value >> 8);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
index 91c29e2..81ebba3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicFrameReader.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class NonDeterministicFrameReader implements IFrameReader {
-    private final NonDeterministicChannelReader channelReader;
+    protected final NonDeterministicChannelReader channelReader;
 
     public NonDeterministicFrameReader(NonDeterministicChannelReader channelReader) {
         this.channelReader = channelReader;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 22a4c1c..85c26ab 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -35,7 +35,7 @@
 
 public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
-    private ITuplePartitionComputerFactory tpcf;
+    protected ITuplePartitionComputerFactory tpcf;
 
     public MToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf) {
         super(spec);
@@ -45,15 +45,15 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
-        final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory,
-                recordDesc, tpcf.createPartitioner());
+                    throws HyracksDataException {
+        final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
+                tpcf.createPartitioner());
         return hashWriter;
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
new file mode 100644
index 0000000..a1909cd
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
@@ -0,0 +1,45 @@
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionCollector;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
+import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
+import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
+
+public class MToNPartitioningWithMessageConnectorDescriptor extends MToNPartitioningConnectorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public MToNPartitioningWithMessageConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf) {
+        super(spec, tpcf);
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+                    throws HyracksDataException {
+        final PartitionWithMessageDataWriter hashWriter = new PartitionWithMessageDataWriter(ctx, nConsumerPartitions,
+                edwFactory, recordDesc, tpcf.createPartitioner());
+        return hashWriter;
+    }
+
+    @Override
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+        BitSet expectedPartitions = new BitSet(nProducerPartitions);
+        expectedPartitions.set(0, nProducerPartitions);
+        NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
+                expectedPartitions);
+        NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
+        return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 646883f..64ca8e7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -33,14 +33,14 @@
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class PartitionDataWriter implements IFrameWriter {
-    private final int consumerPartitionCount;
-    private final IFrameWriter[] pWriters;
-    private final boolean[] isOpen;
-    private final FrameTupleAppender[] appenders;
-    private final FrameTupleAccessor tupleAccessor;
-    private final ITuplePartitionComputer tpc;
-    private final IHyracksTaskContext ctx;
-    private boolean allocatedFrame = false;
+    protected final int consumerPartitionCount;
+    protected final IFrameWriter[] pWriters;
+    protected final boolean[] isOpen;
+    protected final FrameTupleAppender[] appenders;
+    protected final FrameTupleAccessor tupleAccessor;
+    protected final ITuplePartitionComputer tpc;
+    protected final IHyracksTaskContext ctx;
+    protected boolean allocatedFrame = false;
 
     public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory pwFactory,
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
new file mode 100644
index 0000000..3d58bf1
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
@@ -0,0 +1,148 @@
+package org.apache.hyracks.dataflow.std.connectors;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameTupleAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+
+public class PartitionWithMessageDataWriter implements IFrameWriter {
+
+    protected final int consumerPartitionCount;
+    protected final IFrameWriter[] pWriters;
+    protected final boolean[] isOpen;
+    protected final MessagingFrameTupleAppender[] appenders;
+    protected final FrameTupleAccessor tupleAccessor;
+    protected final ITuplePartitionComputer tpc;
+    protected final IHyracksTaskContext ctx;
+    protected boolean allocatedFrame = false;
+
+    public PartitionWithMessageDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc)
+                    throws HyracksDataException {
+        this.consumerPartitionCount = consumerPartitionCount;
+        pWriters = new IFrameWriter[consumerPartitionCount];
+        isOpen = new boolean[consumerPartitionCount];
+        appenders = new MessagingFrameTupleAppender[consumerPartitionCount];
+        for (int i = 0; i < consumerPartitionCount; ++i) {
+            try {
+                pWriters[i] = pwFactory.createFrameWriter(i);
+                appenders[i] = new MessagingFrameTupleAppender(ctx);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        this.tpc = tpc;
+        this.ctx = ctx;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        HyracksDataException closeException = null;
+        for (int i = 0; i < pWriters.length; ++i) {
+            if (isOpen[i]) {
+                if (allocatedFrame) {
+                    try {
+                        appenders[i].write(pWriters[i], true);
+                    } catch (Throwable th) {
+                        if (closeException == null) {
+                            closeException = new HyracksDataException(th);
+                        } else {
+                            closeException.addSuppressed(th);
+                        }
+                    }
+                }
+                try {
+                    pWriters[i].close();
+                } catch (Throwable th) {
+                    if (closeException == null) {
+                        closeException = new HyracksDataException(th);
+                    } else {
+                        closeException.addSuppressed(th);
+                    }
+                }
+            }
+        }
+        if (closeException != null) {
+            throw closeException;
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        allocateFrames();
+        for (int i = 0; i < pWriters.length; ++i) {
+            isOpen[i] = true;
+            pWriters[i].open();
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tupleAccessor.reset(buffer);
+        int tupleCount = tupleAccessor.getTupleCount();
+        for (int i = 0; i < tupleCount; ++i) {
+            int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
+            FrameUtils.appendToWriter(pWriters[h], appenders[h], tupleAccessor, i);
+        }
+    }
+
+    private void allocateFrames() throws HyracksDataException {
+        for (int i = 0; i < appenders.length; ++i) {
+            appenders[i].reset(new VSizeFrame(ctx), true);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        HyracksDataException failException = null;
+        for (int i = 0; i < appenders.length; ++i) {
+            if (isOpen[i]) {
+                try {
+                    pWriters[i].fail();
+                } catch (Throwable th) {
+                    if (failException == null) {
+                        failException = new HyracksDataException(th);
+                    } else {
+                        failException.addSuppressed(th);
+                    }
+                }
+            }
+        }
+        if (failException != null) {
+            throw failException;
+        }
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        for (int i = 0; i < consumerPartitionCount; i++) {
+            appenders[i].flush(pWriters[i]);
+        }
+    }
+
+    public static int appendToWriter(IFrameWriter writer, IFrameTupleAppender frameTupleAppender,
+            IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
+        int flushedBytes = 0;
+        if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+            flushedBytes = frameTupleAppender.getBuffer().capacity();
+            frameTupleAppender.write(writer, true);
+            if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
+                throw new HyracksDataException("The output cannot be fit into a frame.");
+            }
+        }
+        return flushedBytes;
+    }
+
+}
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 6d954eb..531ba46 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.test.support;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -39,11 +40,13 @@
     private final TestJobletContext jobletContext;
     private final TaskAttemptId taskId;
     private WorkspaceFileFactory fileFactory;
+    private HashMap<String, Object> map;
 
     public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) {
         this.jobletContext = jobletContext;
         this.taskId = taskId;
         fileFactory = new WorkspaceFileFactory(this, (IOManager) getIOManager());
+        map = new HashMap<>();
     }
 
     @Override
@@ -137,4 +140,24 @@
     public ExecutorService getExecutorService() {
         return null;
     }
+
+    @Override
+    public void setObject(String name, Object object) {
+        map.put(name, object);
+    }
+
+    @Override
+    public Object getObject(String name) {
+        return map.get(name);
+    }
+
+    @Override
+    public void setSharedObject(HashMap<String, Object> sharedObject) {
+        this.map = sharedObject;
+    }
+
+    @Override
+    public HashMap<String, Object> getSharedObject() {
+        return map;
+    }
 }
\ No newline at end of file

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/765/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Till Westmann (Code Review)" <do...@asterixdb.incubator.apache.org>.
Till Westmann has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 3:

(7 comments)

For the first round I've got a few questions at the code level (and not all of them are suggestions :) ).

https://asterix-gerrit.ics.uci.edu/#/c/604/3/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
File algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java:

Line 53:     private final boolean feed;
Why does a Hyracks operator know anything about feeds?

Would there be a more generic way to describe the property?

Also, if we need an additional piece of information, why don't we use the existing annotations that are available on each operator?
(The same question probably applies to bulkload.)


https://asterix-gerrit.ics.uci.edu/#/c/604/3/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
File algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java:

Line 51:     protected INodeDomain domain;
It seems that these could remain private.


https://asterix-gerrit.ics.uci.edu/#/c/604/3/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
File algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java:

Line 575:                         if (op instanceof InsertDeleteOperator && ((InsertDeleteOperator) op).isFeed()) {
If we used operator annotations we could also avoid the instanceof and cast here.


https://asterix-gerrit.ics.uci.edu/#/c/604/3/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
File hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java:

Line 52:     public Object getObject(String name);
Could we achieve this with the state object of the IOperatorEnvironment?

If we need the methods here, I think that we only should have one setter/getter pair. Right now we have 2 pairs of methods on the interface that are linked in the implementation. So 1 pair seems to be redundant.

If we need the information on this context, what are the trade-offs between adding it here and adding it to the IHyracksCommonContext (and avoiding the other interface changes, e.g. in AsterixDB's ITupleForwarder).


https://asterix-gerrit.ics.uci.edu/#/c/604/3/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
File hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java:

Line 41:     public MessagingFrameTupleAppender(IFrame frame, IHyracksTaskContext ctx) throws HyracksDataException {
There seems to be no need for this constructor.


https://asterix-gerrit.ics.uci.edu/#/c/604/3/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java:

Line 49:         final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
This variable could be removed.


https://asterix-gerrit.ics.uci.edu/#/c/604/3/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java:

Line 42:         final PartitionWithMessageDataWriter hashWriter = new PartitionWithMessageDataWriter(ctx, nConsumerPartitions,
This variable could be removed.


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 3
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-HasComments: Yes

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Michael Blow, Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/604

to look at the new patch set (#5).

Change subject: Support Sending Messages Alongside Frame Data
......................................................................

Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
---
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteResultOperator.java
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
A hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
M hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
M hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
21 files changed, 295 insertions(+), 81 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/04/604/5
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 5
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 2: Verified-1

Build Failed 

https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/766/ : FAILURE

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 2
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 6:

Build Started https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/838/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 6
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 1: Verified-1

Build Failed 

https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/765/ : FAILURE

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 6: Verified+1

Build Successful 

https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/838/ : SUCCESS

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 6
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 5: Verified+1

Build Successful 

https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/837/ : SUCCESS

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 5
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 2:

Build Started https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/766/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 2
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 4:

Build Started https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/832/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 4
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 4: Verified+1

Build Successful 

https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/832/ : SUCCESS

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 4
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 3: Verified+1

Build Successful 

https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/767/ : SUCCESS

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 3
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 3:

(7 comments)

https://asterix-gerrit.ics.uci.edu/#/c/604/3/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
File algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java:

Line 53:     private final boolean feed;
> Why does a Hyracks operator know anything about feeds?
Moved to alterJobSpecForFeed


https://asterix-gerrit.ics.uci.edu/#/c/604/3/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
File algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java:

Line 51:     protected INodeDomain domain;
> It seems that these could remain private.
Done


https://asterix-gerrit.ics.uci.edu/#/c/604/3/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
File algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java:

Line 575:                         if (op instanceof InsertDeleteOperator && ((InsertDeleteOperator) op).isFeed()) {
> If we used operator annotations we could also avoid the instanceof and cast
removed completely


https://asterix-gerrit.ics.uci.edu/#/c/604/3/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
File hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java:

Line 52:     public Object getObject(String name);
> Could we achieve this with the state object of the IOperatorEnvironment?
Using the state of the IOperator environemnt can be done but I believe that it wouldn't be a wise choice. all current methods of the interface will have to be implemented without ever being used.

I have removed the redundant methods. Adding it to the IHyracksCommonContext should be fine but the refactoring in the ITupleForwarder is needed since we're just being more specific.


https://asterix-gerrit.ics.uci.edu/#/c/604/3/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
File hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java:

Line 41:     public MessagingFrameTupleAppender(IFrame frame, IHyracksTaskContext ctx) throws HyracksDataException {
> There seems to be no need for this constructor.
Done


https://asterix-gerrit.ics.uci.edu/#/c/604/3/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java:

Line 49:         final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
> This variable could be removed.
Done


https://asterix-gerrit.ics.uci.edu/#/c/604/3/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java:

Line 42:         final PartitionWithMessageDataWriter hashWriter = new PartitionWithMessageDataWriter(ctx, nConsumerPartitions,
> This variable could be removed.
Done


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 3
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: Yes

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/604

to look at the new patch set (#2).

Change subject: Support Sending Messages Alongside Frame Data
......................................................................

Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
---
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
A algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangeWithMessagePOperator.java
M algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
A hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
M hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
16 files changed, 328 insertions(+), 65 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/04/604/2
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 2
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Murtadha Hubail (Code Review)" <do...@asterixdb.incubator.apache.org>.
Murtadha Hubail has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 6:

(5 comments)

https://asterix-gerrit.ics.uci.edu/#/c/604/6/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteResultOperator.java
File algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteResultOperator.java:

Line 23: 
Revert file


https://asterix-gerrit.ics.uci.edu/#/c/604/6/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
File algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java:

Line 93:         int i = 0;
Revert this file


https://asterix-gerrit.ics.uci.edu/#/c/604/6/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
File hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java:

Line 33:     private IHyracksTaskContext ctx;
final


Line 55:         if (getTupleCount() > 0) {
use tupleCount variable


https://asterix-gerrit.ics.uci.edu/#/c/604/6/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java:

Line 64:     protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx2) {
ctx2 -> ctx


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 6
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: Yes

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Michael Blow, Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/604

to look at the new patch set (#6).

Change subject: Support Sending Messages Alongside Frame Data
......................................................................

Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
---
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteResultOperator.java
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
A hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
M hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
M hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
21 files changed, 295 insertions(+), 81 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/04/604/6
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 6
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Michael Blow, Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/604

to look at the new patch set (#7).

Change subject: Support Sending Messages Alongside Frame Data
......................................................................

Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
---
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
A hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
M hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
M hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
19 files changed, 289 insertions(+), 78 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/04/604/7
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 7
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 7:

Build Started https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/839/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 7
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 6:

(5 comments)

https://asterix-gerrit.ics.uci.edu/#/c/604/6/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteResultOperator.java
File algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteResultOperator.java:

Line 23: 
> Revert file
Done


https://asterix-gerrit.ics.uci.edu/#/c/604/6/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
File algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java:

Line 93:         int i = 0;
> Revert this file
Done


https://asterix-gerrit.ics.uci.edu/#/c/604/6/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
File hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java:

Line 33:     private IHyracksTaskContext ctx;
> final
Done


Line 55:         if (getTupleCount() > 0) {
> use tupleCount variable
Done


https://asterix-gerrit.ics.uci.edu/#/c/604/6/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
File hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java:

Line 64:     protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx2) {
> ctx2 -> ctx
Done


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 6
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: Yes

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 5:

Build Started https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/837/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 5
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Michael Blow (Code Review)" <do...@asterixdb.incubator.apache.org>.
Michael Blow has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 4: Code-Review+1

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 4
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has submitted this change and it was merged.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Reviewed-on: https://asterix-gerrit.ics.uci.edu/604
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mi...@couchbase.com>
Reviewed-by: Murtadha Hubail <hu...@gmail.com>
---
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
A hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
M hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
M hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
19 files changed, 289 insertions(+), 78 deletions(-)

Approvals:
  Michael Blow: Looks good to me, but someone else must approve
  Murtadha Hubail: Looks good to me, approved
  Jenkins: Verified



diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
index eb6b888..473f3ae 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
@@ -54,7 +54,7 @@
      * nbytes the actual data.
      * If the tupleLength includes the field slot, please set the fieldCount = 0
      */
-    public static int calcSpaceInFrame(int fieldCount, int tupleLength) {
+    public static int calcRequiredSpace(int fieldCount, int tupleLength) {
         return 4 + fieldCount * 4 + tupleLength;
     }
 
@@ -68,7 +68,7 @@
      */
     public static int calcAlignedFrameSizeToStore(int fieldCount, int tupleLength, int minFrameSize) {
         assert fieldCount >= 0 && tupleLength >= 0 && minFrameSize > 0;
-        return (1 + (calcSpaceInFrame(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
+        return (1 + (calcRequiredSpace(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
                 * minFrameSize;
     }
 
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index fd1d376..274d446 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -41,4 +41,8 @@
     public IDatasetPartitionManager getDatasetPartitionManager();
 
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception;
+
+    public void setSharedObject(Object sharedObject);
+
+    public Object getSharedObject();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
index 81a0290..339eb9d 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -21,9 +21,6 @@
 import java.io.Serializable;
 import java.util.BitSet;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.IPartitionCollector;
@@ -33,6 +30,8 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.ActivityCluster;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 /**
  * Connector that connects operators in a Job.
@@ -40,6 +39,7 @@
  * @author vinayakb
  */
 public interface IConnectorDescriptor extends Serializable {
+
     /**
      * Gets the id of the connector.
      *
@@ -68,7 +68,7 @@
      */
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException;
+                    throws HyracksDataException;
 
     /**
      * Factory metod to create the receive side reader that reads data from this
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 61baf82..e99fea8 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -95,6 +95,8 @@
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
 
+    private Object sharedObject;
+
     public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor,
             NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) {
         this.joblet = joblet;
@@ -383,4 +385,14 @@
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
         this.ncs.sendApplicationMessageToCC(message, deploymentId);
     }
+
+    @Override
+    public void setSharedObject(Object sharedObject) {
+        this.sharedObject = sharedObject;
+    }
+
+    @Override
+    public Object getSharedObject() {
+        return sharedObject;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index fd71716..1553605 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -29,6 +29,18 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 
+/*
+ * Frame
+ *  _____________________________________________
+ * |[tuple1][tuple2][tuple3].........            |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |..[tupleN][tuplesOffsets(4*N)][tupleCount(4)]|
+ * |_____________________________________________|
+ */
 public class AbstractFrameAppender implements IFrameAppender {
     protected IFrame frame;
     protected byte[] array; // cached the getBuffer().array to speed up byte array access a little
@@ -46,7 +58,7 @@
     }
 
     protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
-        return tupleDataEndOffset + FrameHelper.calcSpaceInFrame(fieldCount, tupleLength)
+        return tupleDataEndOffset + FrameHelper.calcRequiredSpace(fieldCount, tupleLength)
                 + tupleCount * FrameConstants.SIZE_LEN <= FrameHelper.getTupleCountOffset(frame.getFrameSize());
     }
 
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index c9c51d3..136e231 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -40,6 +40,10 @@
         reset(frame, clear);
     }
 
+    /**
+     * append fieldSlots and bytes to the current frame
+     */
+    @Override
     public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
             for (int i = 0; i < fieldSlots.length; ++i) {
@@ -50,27 +54,28 @@
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()),
+                    tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(byte[] bytes, int offset, int length) throws HyracksDataException {
         if (canHoldNewTuple(0, length)) {
             System.arraycopy(bytes, offset, getBuffer().array(), tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length)
             throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
@@ -83,17 +88,16 @@
             }
             System.arraycopy(bytes, offset, array, tupleDataEndOffset + effectiveSlots * 4, length);
             tupleDataEndOffset += effectiveSlots * 4 + length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
             throws HyracksDataException {
         int length = tEndOffset - tStartOffset;
@@ -101,25 +105,25 @@
             ByteBuffer src = tupleAccessor.getBuffer();
             System.arraycopy(src.array(), tStartOffset, array, tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
         int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
         int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex);
         return append(tupleAccessor, tStartOffset, tEndOffset);
     }
 
-    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1,
-            int tIndex1) throws HyracksDataException {
+    @Override
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
         int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
         int length0 = endOffset0 - startOffset0;
@@ -143,22 +147,22 @@
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1,
             int offset1, int dataLen1) throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
@@ -176,21 +180,19 @@
             System.arraycopy(src0.array(), startOffset0, array, tupleDataEndOffset, slotsLen0);
             // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
             for (int i = 0; i < fieldSlots1.length; ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4,
-                        (fieldSlots1[i] + dataLen0));
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy bytes1
-            System.arraycopy(bytes1, offset1, array,
-                    tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0, dataLen1);
+            System.arraycopy(bytes1, offset1, array, tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0,
+                    dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
@@ -219,22 +221,21 @@
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy bytes0
-            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
-                    dataLen0);
+            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1, dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields)
             throws HyracksDataException {
         int fTargetSlotsLength = fields.length * 4;
@@ -253,18 +254,17 @@
                 int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]);
                 int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
                         - accessor.getFieldStartOffset(tIndex, fields[i]);
-                System.arraycopy(accessor.getBuffer().array(), fSrcStart, array, tupleDataEndOffset
-                        + fTargetSlotsLength + fStartOffset, fLen);
+                System.arraycopy(accessor.getBuffer().array(), fSrcStart, array,
+                        tupleDataEndOffset + fTargetSlotsLength + fStartOffset, fLen);
                 fEndOffset += fLen;
                 IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fEndOffset);
                 fStartOffset = fEndOffset;
             }
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
new file mode 100644
index 0000000..7100c11
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class MessagingFrameTupleAppender extends FrameTupleAppender {
+
+    public static final int MAX_MESSAGE_SIZE = 100;
+    private final IHyracksTaskContext ctx;
+
+    public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
+        this.ctx = ctx;
+    }
+
+    @Override
+    protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
+        if (hasEnoughSpace(fieldCount, dataLength + MAX_MESSAGE_SIZE)) {
+            return true;
+        }
+        if (tupleCount == 0) {
+            frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, dataLength + MAX_MESSAGE_SIZE,
+                    frame.getMinSize()));
+            reset(frame.getBuffer(), true);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        if (tupleCount > 0) {
+            appendMessage();
+            getBuffer().clear();
+            outWriter.nextFrame(getBuffer());
+            if (clearFrame) {
+                frame.reset();
+                reset(getBuffer(), true);
+            }
+        }
+    }
+
+    public void appendMessage() {
+        ByteBuffer message = (ByteBuffer) ctx.getSharedObject();
+        System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, message.limit());
+        tupleDataEndOffset += message.limit();
+        IntSerDeUtils.putInt(getBuffer().array(),
+                FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+        ++tupleCount;
+        IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+    }
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
index b808ac1..d2100bb 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
@@ -26,7 +26,14 @@
                 + ((bytes[offset + 3] & 0xff) << 0);
     }
 
+    /**
+     * put integer value into the array bytes at the offset offset
+     * @param bytes byte array to put data in
+     * @param offset offset from the beginning of the array to write the {@code value} in
+     * @param value value to write to {@code bytes[offset]}
+     */
     public static void putInt(byte[] bytes, int offset, int value) {
+
         bytes[offset++] = (byte) (value >> 24);
         bytes[offset++] = (byte) (value >> 16);
         bytes[offset++] = (byte) (value >> 8);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
index cf4808f..7d97507 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
@@ -44,7 +44,7 @@
     }
 
     @Override
-    public boolean allProducersToAllConsumers(){
+    public boolean allProducersToAllConsumers() {
         return true;
     }
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
index 7856d6a..44d77ac 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
@@ -41,8 +41,8 @@
 
     private ITuplePartitionComputerFactory tpcf;
 
-    public LocalityAwareMToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf,
-            ILocalityMap localityMap) {
+    public LocalityAwareMToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf, ILocalityMap localityMap) {
         super(spec);
         this.localityMap = localityMap;
         this.tpcf = tpcf;
@@ -60,7 +60,7 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(),
                 nConsumerPartitions, localityMap, index);
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 22a4c1c..d5e4e20 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -35,7 +35,7 @@
 
 public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
-    private ITuplePartitionComputerFactory tpcf;
+    protected ITuplePartitionComputerFactory tpcf;
 
     public MToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf) {
         super(spec);
@@ -45,15 +45,13 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
-        final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory,
-                recordDesc, tpcf.createPartitioner());
-        return hashWriter;
+                    throws HyracksDataException {
+        return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
@@ -61,4 +59,8 @@
         NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
         return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
     }
+
+    public ITuplePartitionComputerFactory getTuplePartitionComputerFactory() {
+        return tpcf;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
new file mode 100644
index 0000000..e90d8b0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+
+public class MToNPartitioningWithMessageConnectorDescriptor extends MToNPartitioningConnectorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public MToNPartitioningWithMessageConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf) {
+        super(spec, tpcf);
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+                    throws HyracksDataException {
+        return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
+                tpcf.createPartitioner());
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index cfa0cf9..dde29c1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -48,13 +48,13 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return edwFactory.createFrameWriter(index);
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(index);
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
@@ -69,8 +69,8 @@
         OperatorDescriptorId consumer = ac.getConsumerActivity(getConnectorId()).getOperatorDescriptorId();
         OperatorDescriptorId producer = ac.getProducerActivity(getConnectorId()).getOperatorDescriptorId();
 
-        constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(consumer),
-                new PartitionCountExpression(producer)));
+        constraintAcceptor.addConstraint(
+                new Constraint(new PartitionCountExpression(consumer), new PartitionCountExpression(producer)));
     }
 
     @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 646883f..f84c3e4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -51,7 +51,7 @@
         for (int i = 0; i < consumerPartitionCount; ++i) {
             try {
                 pWriters[i] = pwFactory.createFrameWriter(i);
-                appenders[i] = new FrameTupleAppender();
+                appenders[i] = createTupleAppender(ctx);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -61,6 +61,10 @@
         this.ctx = ctx;
     }
 
+    protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
+        return new FrameTupleAppender();
+    }
+
     @Override
     public void close() throws HyracksDataException {
         HyracksDataException closeException = null;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
new file mode 100644
index 0000000..4055fb0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+
+public class PartitionWithMessageDataWriter extends PartitionDataWriter {
+
+    public PartitionWithMessageDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc)
+                    throws HyracksDataException {
+        super(ctx, consumerPartitionCount, pwFactory, recordDescriptor, tpc);
+    }
+
+    @Override
+    protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
+        return new MessagingFrameTupleAppender(ctx);
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 0960927..d121ec4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -26,7 +26,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -52,7 +52,7 @@
     }
 
     @Override
-    public ITupleParser createTupleParser(final IHyracksCommonContext ctx) {
+    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
         return new ITupleParser() {
             @Override
             public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
index daf5104..f495b75 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
@@ -20,9 +20,9 @@
 
 import java.io.Serializable;
 
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ITupleParserFactory extends Serializable {
-    public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException;
+    public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
index eb2714f..4558cf9 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
+++ b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
@@ -27,7 +27,7 @@
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -41,7 +41,7 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public ITupleParser createTupleParser(final IHyracksCommonContext ctx) {
+    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
         return new ITupleParser() {
             @Override
             public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 6d954eb..9848ffb 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -137,4 +137,13 @@
     public ExecutorService getExecutorService() {
         return null;
     }
+
+    @Override
+    public Object getSharedObject() {
+        return null;
+    }
+
+    @Override
+    public void setSharedObject(Object sharedObject) {
+    }
 }
\ No newline at end of file

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 8
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>


Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 3:

Build Started https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/767/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 3
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 7: Verified+1

Build Successful 

https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/839/ : SUCCESS

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 7
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Murtadha Hubail (Code Review)" <do...@asterixdb.incubator.apache.org>.
Murtadha Hubail has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 7: Code-Review+2

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 7
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/604

to look at the new patch set (#4).

Change subject: Support Sending Messages Alongside Frame Data
......................................................................

Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
---
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteResultOperator.java
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
A hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
M hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
M hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
21 files changed, 295 insertions(+), 81 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/04/604/4
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 4
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/604

to look at the new patch set (#3).

Change subject: Support Sending Messages Alongside Frame Data
......................................................................

Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
---
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
M algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
A algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangeWithMessagePOperator.java
M algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
M hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
A hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
M hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
A hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
M hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
16 files changed, 382 insertions(+), 65 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/04/604/3
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 3
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>

Change in hyracks[master]: Support Sending Messages Alongside Frame Data

Posted by "Michael Blow (Code Review)" <do...@asterixdb.incubator.apache.org>.
Michael Blow has posted comments on this change.

Change subject: Support Sending Messages Alongside Frame Data
......................................................................


Patch Set 7: Code-Review+1

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/604
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Gerrit-PatchSet: 7
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No