You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/02/18 08:49:55 UTC

incubator-asterixdb git commit: Support Sending Messages Alongside Frame Data

Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 0c8e22d4d -> c318249ef


Support Sending Messages Alongside Frame Data

This change supports sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: Iaa23e9f8a909ddcafc1c3ee95181092eb04ee1ad
Reviewed-on: https://asterix-gerrit.ics.uci.edu/605
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/c318249e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/c318249e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/c318249e

Branch: refs/heads/master
Commit: c318249efe354a3657186a6ebecdfa8513ed22c9
Parents: 0c8e22d
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Wed Feb 17 15:26:38 2016 +0300
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Wed Feb 17 23:44:49 2016 -0800

----------------------------------------------------------------------
 .../operators/physical/CommitRuntime.java       |  3 ++
 .../asterix/common/parse/ITupleForwarder.java   |  4 +--
 .../dataflow/CounterTimerTupleForwarder.java    |  4 +--
 .../external/dataflow/FeedTupleForwarder.java   | 11 ++++++--
 .../dataflow/FrameFullTupleForwarder.java       |  4 +--
 .../dataflow/RateControlledTupleForwarder.java  |  4 +--
 .../feed/runtime/CollectionRuntime.java         |  8 +++++-
 .../external/feed/runtime/IngestionRuntime.java |  9 +++++-
 .../FeedCollectOperatorNodePushable.java        |  4 +--
 .../FeedIntakeOperatorNodePushable.java         |  6 ++--
 .../operators/FeedMetaStoreNodePushable.java    | 21 +++++++++++++-
 .../asterix/external/util/FeedMessageUtils.java | 29 ++++++++++++++++++++
 .../adapter/TestTypedAdapterFactory.java        |  3 +-
 .../metadata/feeds/FeedMetadataUtil.java        |  6 ++++
 14 files changed, 96 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index 7da4db7..b72018a 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -54,6 +54,8 @@ public class CommitRuntime implements IPushRuntime {
     protected final long[] longHashes;
     protected final LogRecord logRecord;
     protected final FrameTupleReference frameTupleReference;
+    protected final IHyracksTaskContext ctx;
+
     protected ITransactionContext transactionContext;
     protected FrameTupleAccessor frameTupleAccessor;
 
@@ -61,6 +63,7 @@ public class CommitRuntime implements IPushRuntime {
             boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction) {
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
+        this.ctx = ctx;
         this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
         this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
         this.jobId = jobId;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
index 27f4fcb..afd8920 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
@@ -21,7 +21,7 @@ package org.apache.asterix.common.parse;
 import java.util.Map;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
@@ -38,7 +38,7 @@ public interface ITupleForwarder {
 
     public void configure(Map<String, String> configuration) throws HyracksDataException;
 
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
 
     public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
index 116ec09..5deaef0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
@@ -28,7 +28,7 @@ import org.apache.asterix.common.parse.ITupleForwarder;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -69,7 +69,7 @@ public class CounterTimerTupleForwarder implements ITupleForwarder {
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         appender.reset(frame, true);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index b46a338..34a0207 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -18,14 +18,16 @@
  */
 package org.apache.asterix.external.dataflow;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.asterix.common.parse.ITupleForwarder;
 import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.external.util.FeedMessageUtils;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -43,10 +45,15 @@ public class FeedTupleForwarder implements ITupleForwarder {
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;
         this.appender = new FrameTupleAppender(frame);
+        // Set null feed message
+        ByteBuffer message = (ByteBuffer) ctx.getSharedObject();
+        // a null message
+        message.put(FeedMessageUtils.NULL_FEED_MESSAGE);
+        message.flip();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
index 36d41b4..eefc8c2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
@@ -24,7 +24,7 @@ import org.apache.asterix.common.parse.ITupleForwarder;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -42,7 +42,7 @@ public class FrameFullTupleForwarder implements ITupleForwarder {
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
index 99cc3d1..186ca80 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
@@ -24,7 +24,7 @@ import org.apache.asterix.common.parse.ITupleForwarder;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -50,7 +50,7 @@ public class RateControlledTupleForwarder implements ITupleForwarder {
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
index 967dc3e..8249fa6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
@@ -27,6 +27,7 @@ import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * Represents the feed runtime that collects feed tuples from another feed.
@@ -40,14 +41,16 @@ public class CollectionRuntime extends FeedRuntime implements ISubscriberRuntime
     private final ISubscribableRuntime sourceRuntime;       // Runtime that provides the data
     private final Map<String, String> feedPolicy;           // Policy associated with the feed
     private FeedFrameCollector frameCollector;              // Collector that can be plugged into a frame distributor
+    private final IHyracksTaskContext ctx;
 
     public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
             FeedRuntimeInputHandler inputSideHandler, IFrameWriter outputSideWriter, ISubscribableRuntime sourceRuntime,
-            Map<String, String> feedPolicy) {
+            Map<String, String> feedPolicy, IHyracksTaskContext ctx) {
         super(runtimeId, inputSideHandler, outputSideWriter);
         this.connectionId = connectionId;
         this.sourceRuntime = sourceRuntime;
         this.feedPolicy = feedPolicy;
+        this.ctx = ctx;
     }
 
     public State waitTillCollectionOver() throws InterruptedException {
@@ -93,4 +96,7 @@ public class CollectionRuntime extends FeedRuntime implements ISubscriberRuntime
         return frameCollector;
     }
 
+    public IHyracksTaskContext getCtx() {
+        return ctx;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index fd6fcb3..34cb575 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.feed.runtime;
 
+import java.nio.ByteBuffer;
 import java.util.logging.Level;
 
 import org.apache.asterix.external.api.IAdapterRuntimeManager;
@@ -26,16 +27,20 @@ import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
 import org.apache.asterix.external.feed.dataflow.FrameDistributor;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 
 public class IngestionRuntime extends SubscribableRuntime {
 
     private final IAdapterRuntimeManager adapterRuntimeManager;
+    private final IHyracksTaskContext ctx;
 
     public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
-            RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager) {
+            RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager, IHyracksTaskContext ctx) {
         super(feedId, runtimeId, null, feedWriter, recordDesc);
         this.adapterRuntimeManager = adaptorRuntimeManager;
+        this.ctx = ctx;
     }
 
     @Override
@@ -45,12 +50,14 @@ public class IngestionRuntime extends SubscribableRuntime {
         collectionRuntime.setFrameCollector(reader);
 
         if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
+            ctx.setSharedObject(ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE));
             adapterRuntimeManager.start();
         }
         subscribers.add(collectionRuntime);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this);
         }
+        collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 8916af6..7901f03 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -146,7 +146,7 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
                 policyAccessor, false, tupleAccessor, recordDesc, feedManager, nPartitions);
 
         collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, outputSideWriter,
-                sourceRuntime, feedPolicy);
+                sourceRuntime, feedPolicy, ctx);
         feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
         sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
     }
@@ -180,7 +180,7 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
                 new FrameTupleAccessor(recordDesc), recordDesc, feedManager, nPartitions);
 
         collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, wrapper, sourceRuntime,
-                feedPolicy);
+                feedPolicy, ctx);
         feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
         recordDesc = sourceRuntime.getRecordDescriptor();
         sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index b31f2bf..9398fa1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -28,11 +28,12 @@ import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IAdapterRuntimeManager;
 import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
+import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
 import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -40,7 +41,6 @@ import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
 import org.apache.asterix.external.feed.runtime.IngestionRuntime;
 import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -103,7 +103,7 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
                 SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
                         partition);
                 ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
-                        adapterRuntimeManager);
+                        adapterRuntimeManager, ctx);
                 feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
                 feedFrameWriter.open();
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 3c4c9ad..9929358 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -33,12 +33,15 @@ import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
 import org.apache.asterix.external.feed.runtime.FeedRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -88,6 +91,8 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
 
     private FeedRuntimeInputHandler inputSideHandler;
 
+    private ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
+
     public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
             int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
             Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
@@ -101,6 +106,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
         this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
         this.operandId = operationId;
+        ctx.setSharedObject(message);
     }
 
     @Override
@@ -116,7 +122,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
 
             coreOperator.open();
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.log(Level.WARNING, "Failed to open feed store operator", e);
             throw new HyracksDataException(e);
         }
     }
@@ -161,6 +167,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         try {
+            processFeedMessage(buffer);
             inputSideHandler.nextFrame(buffer);
         } catch (Exception e) {
             e.printStackTrace();
@@ -168,6 +175,18 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
         }
     }
 
+    private void processFeedMessage(ByteBuffer buffer) {
+        // read the message and reduce the number of tuples
+        fta.reset(buffer);
+        int tc = fta.getTupleCount() - 1;
+        int offset = fta.getTupleStartOffset(tc);
+        int len = fta.getTupleLength(tc);
+        message.clear();
+        message.put(buffer.array(), offset, len);
+        message.flip();
+        IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), tc);
+    }
+
     @Override
     public void fail() throws HyracksDataException {
         if (LOGGER.isLoggable(Level.WARNING)) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
new file mode 100644
index 0000000..4175ce1
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+public class FeedMessageUtils {
+    public enum MessageType {
+        NULL,
+        ACK_REQUEST
+    }
+
+    public static final byte NULL_FEED_MESSAGE = (byte) MessageType.NULL.ordinal();
+    public static final byte ACK_REQ_FEED_MESSAGE = (byte) MessageType.ACK_REQUEST.ordinal();;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 5346bf2..6921392 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -31,7 +31,6 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -64,7 +63,7 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ITupleParser createTupleParser(final IHyracksCommonContext ctx) throws HyracksDataException {
+            public ITupleParser createTupleParser(final IHyracksTaskContext ctx) throws HyracksDataException {
                 ADMDataParser parser;
                 ITupleForwarder forwarder;
                 ArrayTupleBuilder tb;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/c318249e/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index c69cd16..7ef51cb 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -84,6 +84,7 @@ import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
 
 /**
  * A utility class for providing helper functions for feeds
@@ -212,6 +213,11 @@ public class FeedMetadataUtil {
         Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<ConnectorDescriptorId, ConnectorDescriptorId>();
         for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) {
             IConnectorDescriptor connDesc = entry.getValue();
+            if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
+                MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor) connDesc;
+                connDesc = new MToNPartitioningWithMessageConnectorDescriptor(altered,
+                        m2nConn.getTuplePartitionComputerFactory());
+            }
             ConnectorDescriptorId newConnId = altered.createConnectorDescriptor(connDesc);
             connectorMapping.put(entry.getKey(), newConnId);
         }