You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:01 UTC

[04/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 6380269..09bc5c3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -14,40 +14,66 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.CollectionRuntime;
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.FeedCollectRuntimeInputHandler;
 import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedFrameCollector;
+import edu.uci.ics.asterix.common.feeds.FeedFrameCollector.State;
+import edu.uci.ics.asterix.common.feeds.FeedId;
 import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager;
-import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitResponseMessage;
+import edu.uci.ics.asterix.common.feeds.IngestionRuntime;
+import edu.uci.ics.asterix.common.feeds.IntakePartitionStatistics;
+import edu.uci.ics.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
+import edu.uci.ics.asterix.common.feeds.StorageSideMonitoredBuffer;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.asterix.common.feeds.message.EndFeedMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 /**
- * Runtime for the @see{FeedMessageOperatorDescriptor}
+ * Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating
+ * a feed message to the local feed manager on the host node controller.
+ * 
+ * @see FeedMessageOperatorDescriptor
+ *      IFeedMessage
+ *      IFeedManager
  */
 public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
 
     private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
 
-    private final FeedConnectionId feedId;
-    private final IFeedMessage feedMessage;
-    private final int partition;
-    private final IHyracksTaskContext ctx;
+    private final FeedConnectionId connectionId;
+    private final IFeedMessage message;
     private final IFeedManager feedManager;
+    private final int partition;
 
-    public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedId, IFeedMessage feedMessage,
-            int partition, int nPartitions) {
-        this.feedId = feedId;
-        this.feedMessage = feedMessage;
+    public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId,
+            IFeedMessage feedMessage, int partition, int nPartitions) {
+        this.connectionId = connectionId;
+        this.message = feedMessage;
         this.partition = partition;
-        this.ctx = ctx;
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
         this.feedManager = runtimeCtx.getFeedManager();
@@ -57,47 +83,38 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
     public void initialize() throws HyracksDataException {
         try {
             writer.open();
-            FeedRuntimeId runtimeId = new FeedRuntimeId(feedId, FeedRuntimeType.INGESTION, partition);
-            FeedRuntime feedRuntime = feedManager.getFeedRuntime(runtimeId);
-            boolean ingestionLocation = feedRuntime != null;
-
-            switch (feedMessage.getMessageType()) {
+            switch (message.getMessageType()) {
                 case END:
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Ending feed:" + feedId);
-                    }
-
-                    if (ingestionLocation) {
-                        AdapterRuntimeManager adapterRuntimeMgr = ((IngestionRuntime) feedRuntime)
-                                .getAdapterRuntimeManager();
-                        adapterRuntimeMgr.stop();
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Terminating ingestion for :" + feedId);
-                        }
+                    EndFeedMessage endFeedMessage = (EndFeedMessage) message;
+                    switch (endFeedMessage.getEndMessageType()) {
+                        case DISCONNECT_FEED:
+                            hanldeDisconnectFeedTypeMessage(endFeedMessage);
+                            break;
+                        case DISCONTINUE_SOURCE:
+                            handleDiscontinueFeedTypeMessage(endFeedMessage);
+                            break;
                     }
                     break;
-
-                case SUPER_FEED_MANAGER_ELECT:
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Registering Supers Feed Manager for :" + feedId);
-                    }
-                    FeedManagerElectMessage mesg = ((FeedManagerElectMessage) feedMessage);
-                    SuperFeedManager sfm = new SuperFeedManager(mesg.getFeedId(), mesg.getHost(), mesg.getNodeId(),
-                            mesg.getPort(), feedManager);
-                    synchronized (feedManager) {
-                        INCApplicationContext ncCtx = ctx.getJobletContext().getApplicationContext();
-                        String nodeId = ncCtx.getNodeId();
-                        if (sfm.getNodeId().equals(nodeId)) {
-                            sfm.setLocal(true);
-                        } else {
-                            Thread.sleep(5000);
-                        }
-                        feedManager.registerSuperFeedManager(feedId, sfm);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Registered super feed mgr " + sfm + " for feed " + feedId);
-                        }
-                    }
+                case PREPARE_STALL: {
+                    handlePrepareStallMessage((PrepareStallMessage) message);
+                    break;
+                }
+                case TERMINATE_FLOW: {
+                    FeedConnectionId connectionId = ((TerminateDataFlowMessage) message).getConnectionId();
+                    handleTerminateFlowMessage(connectionId);
+                    break;
+                }
+                case COMMIT_ACK_RESPONSE: {
+                    handleFeedTupleCommitResponseMessage((FeedTupleCommitResponseMessage) message);
+                    break;
+                }
+                case THROTTLING_ENABLED: {
+                    handleThrottlingEnabledMessage((ThrottlingEnabledFeedMessage) message);
+                    break;
+                }
+                default:
                     break;
+
             }
 
         } catch (Exception e) {
@@ -106,4 +123,172 @@ public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOp
             writer.close();
         }
     }
+
+    private void handleThrottlingEnabledMessage(ThrottlingEnabledFeedMessage throttlingMessage) {
+        FeedConnectionId connectionId = throttlingMessage.getConnectionId();
+        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+        Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
+        for (FeedRuntimeId runtimeId : runtimes) {
+            if (runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.STORE)) {
+                FeedRuntime storeRuntime = runtimeManager.getFeedRuntime(runtimeId);
+                ((StorageSideMonitoredBuffer) (storeRuntime.getInputHandler().getmBuffer())).setAcking(false);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Acking Disabled in view of throttling that has been activted upfron in the pipeline "
+                            + connectionId);
+                }
+            }
+        }
+    }
+
+    private void handleFeedTupleCommitResponseMessage(FeedTupleCommitResponseMessage commitResponseMessage) {
+        FeedConnectionId connectionId = commitResponseMessage.getConnectionId();
+        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+        Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
+        for (FeedRuntimeId runtimeId : runtimes) {
+            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+            switch (runtimeId.getFeedRuntimeType()) {
+                case COLLECT:
+                    FeedCollectRuntimeInputHandler inputHandler = (FeedCollectRuntimeInputHandler) runtime
+                            .getInputHandler();
+                    int maxBasePersisted = commitResponseMessage.getMaxWindowAcked();
+                    inputHandler.dropTill(IntakePartitionStatistics.ACK_WINDOW_SIZE * (maxBasePersisted + 1));
+                    break;
+                case STORE:
+                    MonitoredBufferStorageTimerTask sTask = runtime.getInputHandler().getmBuffer()
+                            .getStorageTimeTrackingRateTask();
+                    sTask.receiveCommitAckResponse(commitResponseMessage);
+                    break;
+            }
+        }
+
+        commitResponseMessage.getIntakePartition();
+        SubscribableFeedRuntimeId sid = new SubscribableFeedRuntimeId(connectionId.getFeedId(), FeedRuntimeType.INTAKE,
+                partition);
+        IngestionRuntime ingestionRuntime = (IngestionRuntime) feedManager.getFeedSubscriptionManager()
+                .getSubscribableRuntime(sid);
+        if (ingestionRuntime != null) {
+            IIntakeProgressTracker tracker = ingestionRuntime.getAdapterRuntimeManager().getProgressTracker();
+            if (tracker != null) {
+                tracker.notifyIngestedTupleTimestamp(System.currentTimeMillis());
+            }
+        }
+    }
+
+    private void handleTerminateFlowMessage(FeedConnectionId connectionId) throws HyracksDataException {
+        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+        Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
+
+        boolean found = false;
+        for (FeedRuntimeId runtimeId : feedRuntimes) {
+            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+            if (runtime.getRuntimeId().getRuntimeType().equals(FeedRuntimeType.COLLECT)) {
+                ((CollectionRuntime) runtime).getFrameCollector().setState(State.HANDOVER);
+                found = true;
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Switched " + runtime + " to Hand Over stage");
+                }
+            }
+        }
+        if (!found) {
+            throw new HyracksDataException("COLLECT Runtime  not found!");
+        }
+    }
+
+    private void handlePrepareStallMessage(PrepareStallMessage prepareStallMessage) throws HyracksDataException {
+        FeedConnectionId connectionId = prepareStallMessage.getConnectionId();
+        int computePartitionsRetainLimit = prepareStallMessage.getComputePartitionsRetainLimit();
+        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+        Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
+        for (FeedRuntimeId runtimeId : feedRuntimes) {
+            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+            switch (runtimeId.getFeedRuntimeType()) {
+                case COMPUTE:
+                    Mode requiredMode = runtimeId.getPartition() <= computePartitionsRetainLimit ? Mode.STALL
+                            : Mode.END;
+                    runtime.setMode(requiredMode);
+                    break;
+                default:
+                    runtime.setMode(Mode.STALL);
+                    break;
+            }
+        }
+    }
+
+    private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
+        FeedId sourceFeedId = endFeedMessage.getSourceFeedId();
+        SubscribableFeedRuntimeId subscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+                FeedRuntimeType.INTAKE, partition);
+        ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager().getSubscribableRuntime(
+                subscribableRuntimeId);
+        IAdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
+        adapterRuntimeManager.stop();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Stopped Adapter " + adapterRuntimeManager);
+        }
+    }
+
+    private void hanldeDisconnectFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Ending feed:" + endFeedMessage.getFeedConnectionId());
+        }
+        FeedRuntimeId runtimeId = null;
+        FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
+        if (endFeedMessage.isCompleteDisconnection()) {
+            // subscribableRuntimeType represents the location at which the feed connection receives data
+            FeedRuntimeType runtimeType = null;
+            switch (subscribableRuntimeType) {
+                case INTAKE:
+                    runtimeType = FeedRuntimeType.COLLECT;
+                    break;
+                case COMPUTE:
+                    runtimeType = FeedRuntimeType.COMPUTE_COLLECT;
+                    break;
+                default:
+                    throw new IllegalStateException("Invalid subscribable runtime type " + subscribableRuntimeType);
+            }
+
+            runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
+            CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(
+                    connectionId, runtimeId);
+            feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
+            }
+        } else {
+            // subscribaleRuntimeType represents the location for data hand-off in presence of subscribers
+            switch (subscribableRuntimeType) {
+                case INTAKE:
+                    // illegal state as data hand-off from one feed to another does not happen at intake
+                    throw new IllegalStateException("Illegal State, invalid runtime type  " + subscribableRuntimeType);
+                case COMPUTE:
+                    // feed could be primary or secondary, doesn't matter
+                    SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(
+                            connectionId.getFeedId(), FeedRuntimeType.COMPUTE, partition);
+                    ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager().getSubscribableRuntime(
+                            feedSubscribableRuntimeId);
+                    DistributeFeedFrameWriter dWriter = (DistributeFeedFrameWriter) feedRuntime.getFeedFrameWriter();
+                    Map<IFrameWriter, FeedFrameCollector> registeredCollectors = dWriter.getRegisteredReaders();
+
+                    IFrameWriter unsubscribingWriter = null;
+                    for (Entry<IFrameWriter, FeedFrameCollector> entry : registeredCollectors.entrySet()) {
+                        IFrameWriter frameWriter = entry.getKey();
+                        FeedRuntimeInputHandler feedFrameWriter = (FeedRuntimeInputHandler) frameWriter;
+                        if (feedFrameWriter.getConnectionId().equals(endFeedMessage.getFeedConnectionId())) {
+                            unsubscribingWriter = feedFrameWriter;
+                            dWriter.unsubscribeFeed(unsubscribingWriter);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Partial Unsubscription of " + unsubscribingWriter);
+                            }
+                            break;
+                        }
+                    }
+                    break;
+            }
+
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Unsubscribed from feed :" + connectionId);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaComputeNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
new file mode 100644
index 0000000..371e5b6
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
@@ -0,0 +1,207 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntime;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.SubscribableRuntime;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMetaComputeNodePushable.class.getName());
+
+    /** Runtime node pushable corresponding to the core feed operator **/
+    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+    /**
+     * A policy enforcer that ensures dynamic decisions for a feed are taken
+     * in accordance with the associated ingestion policy
+     **/
+    private FeedPolicyEnforcer policyEnforcer;
+
+    /**
+     * The Feed Runtime instance associated with the operator. Feed Runtime
+     * captures the state of the operator while the feed is active.
+     */
+    private FeedRuntime feedRuntime;
+
+    /**
+     * A unique identifier for the feed instance. A feed instance represents
+     * the flow of data from a feed to a dataset.
+     **/
+    private FeedConnectionId connectionId;
+
+    /**
+     * Denotes the i'th operator instance in a setting where K operator
+     * instances are scheduled to run in parallel
+     **/
+    private int partition;
+
+    private int nPartitions;
+
+    /** The (singleton) instance of IFeedManager **/
+    private IFeedManager feedManager;
+
+    private FrameTupleAccessor fta;
+
+    private final IHyracksTaskContext ctx;
+
+    private final FeedRuntimeType runtimeType = FeedRuntimeType.COMPUTE;
+
+    private FeedRuntimeInputHandler inputSideHandler;
+
+    public FeedMetaComputeNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
+            int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+            Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+        this.ctx = ctx;
+        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+                .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+        this.connectionId = feedConnectionId;
+        this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject()).getFeedManager();
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
+        this.feedManager = runtimeCtx.getFeedManager();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        FeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(connectionId.getFeedId(), runtimeType, partition);
+        try {
+            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+            if (feedRuntime == null) {
+                initializeNewFeedRuntime(runtimeId);
+            } else {
+                reviveOldFeedRuntime(runtimeId);
+            }
+            writer.open();
+            coreOperator.open();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        this.fta = new FrameTupleAccessor(recordDesc);
+        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
+                policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager,
+                nPartitions);
+
+        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(), writer,
+                runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+        coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
+
+        feedRuntime = new SubscribableRuntime(connectionId.getFeedId(), runtimeId, inputSideHandler, distributeWriter,
+                recordDesc);
+        feedManager.getFeedSubscriptionManager().registerFeedSubscribableRuntime((ISubscribableRuntime) feedRuntime);
+        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
+
+        distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
+    }
+
+    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        this.fta = new FrameTupleAccessor(recordDesc);
+        this.inputSideHandler = feedRuntime.getInputHandler();
+        this.inputSideHandler.setCoreOperator(coreOperator);
+
+        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(), writer,
+                runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+        coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
+        distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
+
+        inputSideHandler.reset(nPartitions);
+        feedRuntime.setMode(Mode.PROCESS);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        try {
+            inputSideHandler.nextFrame(buffer);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("Core Op:" + coreOperator.getDisplayName() + " fail ");
+        }
+        feedRuntime.setMode(Mode.FAIL);
+        coreOperator.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
+        boolean end = inputSideHandler.getMode().equals(Mode.END);
+        try {
+            if (inputSideHandler != null) {
+                if (!(stalled || end)) {
+                    inputSideHandler.nextFrame(null); // signal end of data
+                    while (!inputSideHandler.isFinished()) {
+                        synchronized (coreOperator) {
+                            coreOperator.wait();
+                        }
+                    }
+                } else {
+                    inputSideHandler.setFinished(true);
+                }
+            }
+            coreOperator.close();
+            System.out.println("CLOSED " + coreOperator + " STALLED ?" + stalled + " ENDED " + end);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (!stalled) {
+                deregister();
+                System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
+            } else {
+                System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
+            }
+            if (inputSideHandler != null) {
+                inputSideHandler.close();
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
+            }
+        }
+    }
+
+    private void deregister() {
+        if (feedRuntime != null) {
+            // deregister from subscription manager
+            SubscribableFeedRuntimeId runtimeId = (SubscribableFeedRuntimeId) feedRuntime.getRuntimeId();
+            feedManager.getFeedSubscriptionManager().deregisterFeedSubscribableRuntime(runtimeId);
+
+            // deregister from connection manager
+            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
+                    ((FeedRuntime) feedRuntime).getRuntimeId());
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaNodePushable.java
new file mode 100644
index 0000000..d348cfc
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaNodePushable.java
@@ -0,0 +1,170 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntime;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMetaNodePushable.class.getName());
+
+    /** Runtime node pushable corresponding to the core feed operator **/
+    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+    /**
+     * A policy enforcer that ensures dyanmic decisions for a feed are taken
+     * in accordance with the associated ingestion policy
+     **/
+    private FeedPolicyEnforcer policyEnforcer;
+
+    /**
+     * The Feed Runtime instance associated with the operator. Feed Runtime
+     * captures the state of the operator while the feed is active.
+     */
+    private FeedRuntime feedRuntime;
+
+    /**
+     * A unique identifier for the feed instance. A feed instance represents
+     * the flow of data from a feed to a dataset.
+     **/
+    private FeedConnectionId connectionId;
+
+    /**
+     * Denotes the i'th operator instance in a setting where K operator
+     * instances are scheduled to run in parallel
+     **/
+    private int partition;
+
+    /** Total number of partitions available **/
+    private int nPartitions;
+
+    /** Type associated with the core feed operator **/
+    private final FeedRuntimeType runtimeType = FeedRuntimeType.OTHER;
+
+    /** The (singleton) instance of IFeedManager **/
+    private IFeedManager feedManager;
+
+    private FrameTupleAccessor fta;
+
+    private final IHyracksTaskContext ctx;
+
+    private final String operandId;
+
+    /** The pre-processor associated with this runtime **/
+    private FeedRuntimeInputHandler inputSideHandler;
+
+    public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
+            int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+            Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+        this.ctx = ctx;
+        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+                .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+        this.connectionId = feedConnectionId;
+        this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject()).getFeedManager();
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
+        this.feedManager = runtimeCtx.getFeedManager();
+        this.operandId = operationId;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+        try {
+            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+            if (feedRuntime == null) {
+                initializeNewFeedRuntime(runtimeId);
+            } else {
+                reviveOldFeedRuntime(runtimeId);
+            }
+            coreOperator.open();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        this.fta = new FrameTupleAccessor(recordDesc);
+        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId,
+                (AbstractUnaryInputUnaryOutputOperatorNodePushable) coreOperator,
+                policyEnforcer.getFeedPolicyAccessor(), false, fta, recordDesc, feedManager,
+                nPartitions);
+
+        setupBasicRuntime(inputSideHandler);
+    }
+
+    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        this.inputSideHandler = feedRuntime.getInputHandler();
+        this.fta = new FrameTupleAccessor(recordDesc);
+        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+        feedRuntime.setMode(Mode.PROCESS);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Retreived state from the zombie instance " + runtimeType + " node.");
+        }
+    }
+
+    private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
+        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+        feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        try {
+            inputSideHandler.nextFrame(buffer);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
+        }
+        feedRuntime.setMode(Mode.FAIL);
+        coreOperator.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            coreOperator.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+            // ignore
+        } finally {
+            if (inputSideHandler != null) {
+                inputSideHandler.close();
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
+            }
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index d636e04..953853d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -1,75 +1,80 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.asterix.metadata.feeds;
 
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.Map;
 
-import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
 import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeState;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 
 /**
  * FeedMetaOperatorDescriptor is a wrapper operator that provides a sanboox like
- * environment for an hyracks operator that is part of a feed ingestion pipeline.
- * The MetaFeed operator provides an interface iden- tical to that offered by the
- * underlying wrapped operator, hereafter referred to as the core operator.
- * As seen by Hyracks, the altered pipeline is identical to the earlier version formed
- * from core operators. The MetaFeed operator enhances each core operator by providing
- * functionality for handling runtime exceptions, saving any state for future retrieval,
- * and measuring/reporting of performance characteristics. We next describe how the added
- * functionality contributes to providing fault- tolerance.
+ * environment for an hyracks operator that is part of a feed ingestion
+ * pipeline. The MetaFeed operator provides an interface iden- tical to that
+ * offered by the underlying wrapped operator, hereafter referred to as the core
+ * operator. As seen by Hyracks, the altered pipeline is identical to the
+ * earlier version formed from core operators. The MetaFeed operator enhances
+ * each core operator by providing functionality for handling runtime
+ * exceptions, saving any state for future retrieval, and measuring/reporting of
+ * performance characteristics. We next describe how the added functionality
+ * contributes to providing fault- tolerance.
  */
 
 public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
 
-    private static final Logger LOGGER = Logger.getLogger(FeedMetaOperatorDescriptor.class.getName());
-
-    /** The actual (Hyracks) operator that is wrapped around by the Metafeed Adapter **/
+    /**
+     * The actual (Hyracks) operator that is wrapped around by the MetaFeed
+     * operator.
+     **/
     private IOperatorDescriptor coreOperator;
 
     /**
-     * A unique identifier for the feed instance. A feed instance represents the flow of data
-     * from a feed to a dataset.
+     * A unique identifier for the feed instance. A feed instance represents the
+     * flow of data from a feed to a dataset.
      **/
     private final FeedConnectionId feedConnectionId;
 
     /**
      * The policy associated with the feed instance.
-     */
-    private final FeedPolicy feedPolicy;
+     **/
+    private final Map<String, String> feedPolicyProperties;
 
     /**
      * type for the feed runtime associated with the operator.
-     * Possible values: INGESTION, COMPUTE, STORAGE, COMMIT
-     */
+     * Possible values: COMPUTE, STORE, OTHER
+     **/
     private final FeedRuntimeType runtimeType;
 
     private final String operandId;
 
     public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
-            IOperatorDescriptor coreOperatorDescriptor, FeedPolicy feedPolicy, FeedRuntimeType runtimeType,
-            String operandId) {
+            IOperatorDescriptor coreOperatorDescriptor, Map<String, String> feedPolicyProperties,
+            FeedRuntimeType runtimeType, boolean enableSubscriptionMode, String operandId) {
         super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
         this.feedConnectionId = feedConnectionId;
-        this.feedPolicy = feedPolicy;
+        this.feedPolicyProperties = feedPolicyProperties;
         if (coreOperatorDescriptor.getOutputRecordDescriptors().length == 1) {
             recordDescriptors[0] = coreOperatorDescriptor.getOutputRecordDescriptors()[0];
         }
@@ -81,8 +86,30 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
-                feedConnectionId, feedPolicy, runtimeType, operandId);
+        IOperatorNodePushable nodePushable = null;
+        switch (runtimeType) {
+            case COMPUTE:
+                nodePushable = new FeedMetaComputeNodePushable(ctx, recordDescProvider, partition, nPartitions,
+                        coreOperator, feedConnectionId, feedPolicyProperties, operandId);
+                break;
+            case STORE:
+                nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions,
+                        coreOperator, feedConnectionId, feedPolicyProperties, operandId);
+                break;
+            case OTHER:
+                nodePushable = new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
+                        feedConnectionId, feedPolicyProperties, operandId);
+                break;
+            case ETS:
+                nodePushable = ((AlgebricksMetaOperatorDescriptor) coreOperator).createPushRuntime(ctx,
+                        recordDescProvider, partition, nPartitions);
+                break;
+            case JOIN:
+                break;
+            default:
+                throw new HyracksDataException(new IllegalArgumentException("Invalid feed runtime: " + runtimeType));
+        }
+        return nodePushable;
     }
 
     @Override
@@ -90,173 +117,12 @@ public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDe
         return "FeedMeta [" + coreOperator + " ]";
     }
 
-    private static class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
-        /** Runtime node pushable corresponding to the core feed operator **/
-        private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperatorNodePushable;
-
-        /**
-         * A policy enforcer that ensures dyanmic decisions for a feed are taken in accordance
-         * with the associated ingestion policy
-         **/
-        private FeedPolicyEnforcer policyEnforcer;
-
-        /**
-         * The Feed Runtime instance associated with the operator. Feed Runtime captures the state of the operator while
-         * the feed is active.
-         */
-        private FeedRuntime feedRuntime;
-
-        /**
-         * A unique identifier for the feed instance. A feed instance represents the flow of data
-         * from a feed to a dataset.
-         **/
-        private FeedConnectionId feedId;
-
-        /** Denotes the i'th operator instance in a setting where K operator instances are scheduled to run in parallel **/
-        private int partition;
-
-        /** A buffer that is used to hold the current frame that is being processed **/
-        private ByteBuffer currentBuffer;
-
-        /** Type associated with the core feed operator **/
-        private final FeedRuntimeType runtimeType;
-
-        /** True is the feed is recovering from a previous failed execution **/
-        private boolean resumeOldState;
-
-        /** The Node Controller ID for the host NC **/
-
-        private String nodeId;
-
-        /** Allows to iterate over the tuples in a frame **/
-        private FrameTupleAccessor fta;
-
-        /** The (singleton) instance of IFeedManager **/
-        private IFeedManager feedManager;
-
-        private final String operandId;
-
-        public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-                int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
-                FeedPolicy feedPolicy, FeedRuntimeType runtimeType, String operationId) throws HyracksDataException {
-            this.coreOperatorNodePushable = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
-                    .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-            this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicy.getProperties());
-            this.partition = partition;
-            this.runtimeType = runtimeType;
-            this.feedId = feedConnectionId;
-            this.nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
-            fta = new FrameTupleAccessor(recordDesc);
-            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                    .getApplicationContext().getApplicationObject();
-            this.feedManager = runtimeCtx.getFeedManager();
-            this.operandId = operationId;
-        }
-
-        @Override
-        public void open() throws HyracksDataException {
-            FeedRuntimeId runtimeId = new FeedRuntimeId(feedId, runtimeType, operandId, partition);
-            try {
-                feedRuntime = feedManager.getFeedRuntime(runtimeId);
-                if (feedRuntime == null) {
-                    feedRuntime = new FeedRuntime(feedId, partition, runtimeType, operandId);
-                    feedManager.registerFeedRuntime(feedRuntime);
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Did not find a saved state from a previous zombie, starting a new instance for "
-                                + runtimeType + " node.");
-                    }
-                    resumeOldState = false;
-                } else {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Retreived state from the zombie instance from previous execution for "
-                                + runtimeType + " node.");
-                    }
-                    resumeOldState = true;
-                }
-                FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
-                        runtimeType, partition, fta, feedManager);
-                coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
-                coreOperatorNodePushable.open();
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.SEVERE)) {
-                    LOGGER.severe("Unable to initialize feed operator " + feedRuntime + " [" + partition + "]");
-                }
-                throw new HyracksDataException(e);
-            }
-        }
-
-        @Override
-        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            try {
-                if (resumeOldState) {
-                    FeedRuntimeState runtimeState = feedRuntime.getRuntimeState();
-                    if (runtimeState != null) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("State from previous zombie instance " + feedRuntime.getRuntimeState());
-                        }
-                        coreOperatorNodePushable.nextFrame(feedRuntime.getRuntimeState().getFrame());
-                        feedRuntime.setRuntimeState(null);
-                    }
-                    resumeOldState = false;
-                }
-                currentBuffer = buffer;
-                coreOperatorNodePushable.nextFrame(buffer);
-                currentBuffer = null;
-            } catch (HyracksDataException e) {
-                if (policyEnforcer.getFeedPolicyAccessor().continueOnApplicationFailure()) {
-                    boolean isExceptionHarmful = e.getCause() instanceof TreeIndexException && !resumeOldState;
-                    if (isExceptionHarmful) {
-                        // TODO: log the tuple
-                        FeedRuntimeState runtimeState = new FeedRuntimeState(buffer, writer, e);
-                        feedRuntime.setRuntimeState(runtimeState);
-                    } else {
-                        // ignore the frame (exception is expected)
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Ignoring exception " + e);
-                        }
-                    }
-                } else {
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Feed policy does not require feed to survive soft failure");
-                    }
-                    throw e;
-                }
-            }
-        }
-
-        @Override
-        public void fail() throws HyracksDataException {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " fail ");
-            }
-            if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
-                if (currentBuffer != null) {
-                    FeedRuntimeState runtimeState = new FeedRuntimeState(currentBuffer, writer, null);
-                    feedRuntime.setRuntimeState(runtimeState);
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Saved feed compute runtime for revivals" + feedRuntime.getFeedRuntimeId());
-                    }
-                } else {
-                    feedManager.deRegisterFeedRuntime(feedRuntime.getFeedRuntimeId());
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("No state to save, de-registered feed runtime " + feedRuntime.getFeedRuntimeId());
-                    }
-                }
-            }
-            coreOperatorNodePushable.fail();
-        }
-
-        @Override
-        public void close() throws HyracksDataException {
-            coreOperatorNodePushable.close();
-            feedManager.deRegisterFeedRuntime(feedRuntime.getFeedRuntimeId());
-        }
-
-    }
-
     public IOperatorDescriptor getCoreOperator() {
         return coreOperator;
     }
 
+    public FeedRuntimeType getRuntimeType() {
+        return runtimeType;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaStoreNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
new file mode 100644
index 0000000..1bb377c
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
@@ -0,0 +1,198 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntime;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMetaStoreNodePushable.class.getName());
+
+    /** Runtime node pushable corresponding to the core feed operator **/
+    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+    /**
+     * A policy enforcer that ensures dyanmic decisions for a feed are taken
+     * in accordance with the associated ingestion policy
+     **/
+    private FeedPolicyEnforcer policyEnforcer;
+
+    /**
+     * The Feed Runtime instance associated with the operator. Feed Runtime
+     * captures the state of the operator while the feed is active.
+     */
+    private FeedRuntime feedRuntime;
+
+    /**
+     * A unique identifier for the feed instance. A feed instance represents
+     * the flow of data from a feed to a dataset.
+     **/
+    private FeedConnectionId connectionId;
+
+    /**
+     * Denotes the i'th operator instance in a setting where K operator
+     * instances are scheduled to run in parallel
+     **/
+    private int partition;
+
+    private int nPartitions;
+
+    /** Type associated with the core feed operator **/
+    private final FeedRuntimeType runtimeType = FeedRuntimeType.STORE;
+
+    /** The (singleton) instance of IFeedManager **/
+    private IFeedManager feedManager;
+
+    private FrameTupleAccessor fta;
+
+    private final IHyracksTaskContext ctx;
+
+    private final String operandId;
+
+    private FeedRuntimeInputHandler inputSideHandler;
+
+    public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
+            int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+            Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+        this.ctx = ctx;
+        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+                .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+        this.connectionId = feedConnectionId;
+        this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject()).getFeedManager();
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
+        this.feedManager = runtimeCtx.getFeedManager();
+        this.operandId = operationId;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+        try {
+            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+            if (feedRuntime == null) {
+                initializeNewFeedRuntime(runtimeId);
+            } else {
+                reviveOldFeedRuntime(runtimeId);
+            }
+
+            coreOperator.open();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("Runtime not found for  " + runtimeId + " connection id " + connectionId);
+        }
+        this.fta = new FrameTupleAccessor(recordDesc);
+        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
+                policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager,
+                nPartitions);
+        setupBasicRuntime(inputSideHandler);
+    }
+
+    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        this.inputSideHandler = feedRuntime.getInputHandler();
+        this.fta = new FrameTupleAccessor(recordDesc);
+        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+        this.inputSideHandler.reset(nPartitions);
+        this.inputSideHandler.setCoreOperator(coreOperator);
+        feedRuntime.setMode(Mode.PROCESS);
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("Retreived state from the zombie instance from previous execution for " + runtimeType
+                    + " node.");
+        }
+    }
+
+    private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
+        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+        feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
+        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, (FeedRuntime) feedRuntime);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        try {
+            inputSideHandler.nextFrame(buffer);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
+        }
+        feedRuntime.setMode(Mode.FAIL);
+        coreOperator.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        System.out.println("CLOSE CALLED FOR " + this.feedRuntime.getRuntimeId());
+        boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
+        try {
+            if (!stalled) {
+                System.out.println("SIGNALLING END OF DATA for " + this.feedRuntime.getRuntimeId() + " mode is "
+                        + inputSideHandler.getMode() + " WAITING ON " + coreOperator);
+                inputSideHandler.nextFrame(null); // signal end of data
+                while (!inputSideHandler.isFinished()) {
+                    synchronized (coreOperator) {
+                        coreOperator.wait();
+                    }
+                }
+                System.out.println("ABOUT TO CLOSE OPERATOR  " + coreOperator);
+            }
+            coreOperator.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+            // ignore
+        } finally {
+            if (!stalled) {
+                deregister();
+                System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
+            } else {
+                System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
+            }
+            inputSideHandler.close();
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
+            }
+        }
+    }
+
+    private void deregister() {
+        if (feedRuntime != null) {
+            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
+                    ((FeedRuntime) feedRuntime).getRuntimeId());
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
deleted file mode 100644
index fd9716c..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.Map;
-
-public class FeedPolicyAccessor {
-    public static final String FAILURE_LOG_ERROR = "failure.log.error";
-    public static final String APPLICATION_FAILURE_LOG_DATA = "application.failure.log.data";
-    public static final String APPLICATION_FAILURE_CONTINUE = "application.failure.continue";
-    public static final String HARDWARE_FAILURE_CONTINUE = "hardware.failure.continue";
-    public static final String CLUSTER_REBOOT_AUTO_RESTART = "cluster.reboot.auto.restart";
-    public static final String COLLECT_STATISTICS = "collect.statistics";
-    public static final String COLLECT_STATISTICS_PERIOD = "collect.statistics.period";
-    public static final String COLLECT_STATISTICS_PERIOD_UNIT = "collect.statistics.period.unit";
-    public static final String ELASTIC = "elastic";
-
-    public enum TimeUnit {
-        SEC,
-        MIN,
-        HRS,
-        DAYS
-    }
-
-    private Map<String, String> feedPolicy;
-
-    public FeedPolicyAccessor(Map<String, String> feedPolicy) {
-        this.feedPolicy = feedPolicy;
-    }
-
-    public boolean logErrorOnFailure() {
-        return getBooleanPropertyValue(FAILURE_LOG_ERROR);
-    }
-
-    public boolean logDataOnApplicationFailure() {
-        return getBooleanPropertyValue(APPLICATION_FAILURE_LOG_DATA);
-    }
-
-    public boolean continueOnApplicationFailure() {
-        return getBooleanPropertyValue(APPLICATION_FAILURE_CONTINUE);
-    }
-
-    public boolean continueOnHardwareFailure() {
-        return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE);
-    }
-
-    public boolean autoRestartOnClusterReboot() {
-        return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART);
-    }
-
-    public boolean collectStatistics() {
-        return getBooleanPropertyValue(COLLECT_STATISTICS);
-    }
-
-    public long getStatisicsCollectionPeriodInSecs() {
-        return getIntegerPropertyValue(COLLECT_STATISTICS_PERIOD) * getTimeUnitFactor();
-    }
-
-    public boolean isElastic() {
-        return getBooleanPropertyValue(ELASTIC);
-    }
-
-    private int getTimeUnitFactor() {
-        String v = feedPolicy.get(COLLECT_STATISTICS_PERIOD_UNIT);
-        int factor = 1;
-        switch (TimeUnit.valueOf(v)) {
-            case SEC:
-                factor = 1;
-                break;
-            case MIN:
-                factor = 60;
-                break;
-            case HRS:
-                factor = 3600;
-                break;
-            case DAYS:
-                factor = 216000;
-                break;
-
-        }
-        return factor;
-    }
-
-    private boolean getBooleanPropertyValue(String key) {
-        String v = feedPolicy.get(key);
-        return v == null ? false : Boolean.valueOf(v);
-    }
-
-    private int getIntegerPropertyValue(String key) {
-        String v = feedPolicy.get(key);
-        return Integer.parseInt(v);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
index 44487ec..5cd4bf1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
@@ -15,60 +15,32 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
 import java.rmi.RemoteException;
-import java.util.HashMap;
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
 
 public class FeedPolicyEnforcer {
 
-    private final FeedConnectionId feedId;
-    private final FeedPolicyAccessor feedPolicyAccessor;
-    private final FeedActivity feedActivity;
+    private final FeedConnectionId connectionId;
+    private final FeedPolicyAccessor policyAccessor;
 
-    public FeedPolicyEnforcer(FeedConnectionId feedId, Map<String, String> feedPolicy) {
-        this.feedId = feedId;
-        this.feedPolicyAccessor = new FeedPolicyAccessor(feedPolicy);
-        this.feedActivity = new FeedActivity(feedId.getDataverse(), feedId.getFeedName(), feedId.getDatasetName(),
-                null, new HashMap<String, String>());
+    public FeedPolicyEnforcer(FeedConnectionId feedConnectionId, Map<String, String> feedPolicy) {
+        this.connectionId = feedConnectionId;
+        this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
     }
 
     public boolean continueIngestionPostSoftwareFailure(Exception e) throws RemoteException, ACIDException {
-        boolean continueIngestion = feedPolicyAccessor.continueOnApplicationFailure();
-        if (feedPolicyAccessor.logErrorOnFailure()) {
-            persistExceptionDetails(e);
-        }
-        return continueIngestion;
-    }
-
-    private synchronized void persistExceptionDetails(Exception e) throws RemoteException, ACIDException {
-        MetadataManager.INSTANCE.acquireWriteLatch();
-        MetadataTransactionContext ctx = null;
-        try {
-            ctx = MetadataManager.INSTANCE.beginTransaction();
-            feedActivity.setActivityType(FeedActivityType.FEED_FAILURE);
-            feedActivity.getFeedActivityDetails().put(FeedActivity.FeedActivityDetails.EXCEPTION_MESSAGE,
-                    e.getMessage());
-            MetadataManager.INSTANCE.registerFeedActivity(ctx, feedId, feedActivity);
-            MetadataManager.INSTANCE.commitTransaction(ctx);
-        } catch (Exception e2) {
-            MetadataManager.INSTANCE.abortTransaction(ctx);
-        } finally {
-            MetadataManager.INSTANCE.releaseWriteLatch();
-        }
+        return policyAccessor.continueOnSoftFailure();
     }
 
     public FeedPolicyAccessor getFeedPolicyAccessor() {
-        return feedPolicyAccessor;
+        return policyAccessor;
     }
 
     public FeedConnectionId getFeedId() {
-        return feedId;
+        return connectionId;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
deleted file mode 100644
index d3225a2..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager.FeedReportMessageType;
-
-public class FeedReport implements Comparable {
-
-    private FeedConnectionId feedId;
-    private FeedReportMessageType reportType;
-    private int partition = -1;
-    private FeedRuntimeType runtimeType;
-    private long value = -1;
-    private String[] representation;
-
-    public FeedReport() {
-    }
-
-    public FeedReport(String message) {
-        representation = message.split("\\|");
-    }
-
-    public void reset(String message) {
-        representation = message.split("\\|");
-        reportType = null;
-        feedId = null;
-        runtimeType = null;
-        partition = -1;
-        value = -1;
-    }
-
-    @Override
-    public String toString() {
-        return getFeedId() + " " + getReportType() + " " + getPartition() + " " + getRuntimeType() + " " + getValue();
-    }
-
-    public FeedConnectionId getFeedId() {
-        if (feedId == null) {
-            String feedIdRep = representation[1];
-            String[] feedIdComp = feedIdRep.split(":");
-            feedId = new FeedConnectionId(feedIdComp[0], feedIdComp[1], feedIdComp[2]);
-        }
-        return feedId;
-    }
-
-    public FeedReportMessageType getReportType() {
-        if (reportType == null) {
-            reportType = FeedReportMessageType.valueOf(representation[0].toUpperCase());
-        }
-        return reportType;
-    }
-
-    public int getPartition() {
-        if (partition < 0) {
-            partition = Integer.parseInt(representation[3]);
-        }
-        return partition;
-    }
-
-    public FeedRuntimeType getRuntimeType() {
-        if (runtimeType == null) {
-            runtimeType = FeedRuntimeType.valueOf(representation[2].toUpperCase());
-        }
-        return runtimeType;
-    }
-
-    public long getValue() {
-        if (value < 0) {
-            value = Long.parseLong(representation[4]);
-        }
-        return value;
-    }
-
-    public String[] getRepresentation() {
-        return representation;
-    }
-
-    @Override
-    public int compareTo(Object o) {
-        if (!(o instanceof FeedReport)) {
-            throw new IllegalArgumentException("Incorrect operand type " + o);
-        }
-
-        FeedReport other = (FeedReport) o;
-        if (!other.getReportType().equals(getReportType())) {
-            throw new IllegalArgumentException("Incorrect operand type " + o);
-        }
-
-        int returnValue = 0;
-
-        switch (getReportType()) {
-            case CONGESTION:
-                returnValue = ranking.get(getRuntimeType()) - ranking.get(other.getRuntimeType());
-                break;
-
-            case THROUGHPUT:
-                returnValue = (int) (other.getValue() - getValue());
-                break;
-        }
-
-        return returnValue;
-    }
-
-    private static Map<FeedRuntimeType, Integer> ranking = populateRanking();
-
-    private static Map<FeedRuntimeType, Integer> populateRanking() {
-        Map<FeedRuntimeType, Integer> ranking = new HashMap<FeedRuntimeType, Integer>();
-        ranking.put(FeedRuntimeType.INGESTION, 1);
-        ranking.put(FeedRuntimeType.COMPUTE, 2);
-        ranking.put(FeedRuntimeType.STORAGE, 3);
-        ranking.put(FeedRuntimeType.COMMIT, 4);
-        return ranking;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedSubscriptionManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedSubscriptionManager.java
new file mode 100644
index 0000000..48102ba
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedSubscriptionManager.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+
+public class FeedSubscriptionManager implements IFeedSubscriptionManager {
+
+    private static Logger LOGGER = Logger.getLogger(FeedSubscriptionManager.class.getName());
+
+    private final String nodeId;
+
+    private final Map<SubscribableFeedRuntimeId, ISubscribableRuntime> subscribableRuntimes;
+
+    public FeedSubscriptionManager(String nodeId) {
+        this.nodeId = nodeId;
+        this.subscribableRuntimes = new HashMap<SubscribableFeedRuntimeId, ISubscribableRuntime>();
+    }
+
+    @Override
+    public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime) {
+        SubscribableFeedRuntimeId sid = (SubscribableFeedRuntimeId) subscribableRuntime.getRuntimeId();
+        if (!subscribableRuntimes.containsKey(sid)) {
+            subscribableRuntimes.put(sid, subscribableRuntime);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Registered feed subscribable runtime " + subscribableRuntime);
+            }
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Feed ingestion runtime " + subscribableRuntime + " already registered.");
+            }
+        }
+    }
+
+    @Override
+    public ISubscribableRuntime getSubscribableRuntime(SubscribableFeedRuntimeId subscribableFeedRuntimeId) {
+        return subscribableRuntimes.get(subscribableFeedRuntimeId);
+    }
+
+    @Override
+    public void deregisterFeedSubscribableRuntime(SubscribableFeedRuntimeId ingestionId) {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("De-registered feed subscribable runtime " + ingestionId);
+        }
+        subscribableRuntimes.remove(ingestionId);
+    }
+
+    @Override
+    public String toString() {
+        return "IngestionManager [" + nodeId + "]";
+    }
+
+}