You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:02 UTC

[05/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
new file mode 100644
index 0000000..48d1dc6
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorNodePushable.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.CollectionRuntime;
+import edu.uci.ics.asterix.common.feeds.FeedCollectRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedFrameCollector.State;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ * The runtime for @see{FeedIntakeOperationDescriptor}
+ */
+public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+
+    private static Logger LOGGER = Logger.getLogger(FeedCollectOperatorNodePushable.class.getName());
+
+    private final int partition;
+    private final FeedConnectionId connectionId;
+    private final Map<String, String> feedPolicy;
+    private final FeedPolicyAccessor policyAccessor;
+    private final IFeedManager feedManager;
+    private final ISubscribableRuntime sourceRuntime;
+    private final IHyracksTaskContext ctx;
+    private final int nPartitions;
+
+    private RecordDescriptor outputRecordDescriptor;
+    private FeedRuntimeInputHandler inputSideHandler;
+    private CollectionRuntime collectRuntime;
+
+    public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedId sourceFeedId,
+            FeedConnectionId feedConnectionId, Map<String, String> feedPolicy, int partition, int nPartitions,
+            ISubscribableRuntime sourceRuntime) {
+        this.ctx = ctx;
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+        this.connectionId = feedConnectionId;
+        this.sourceRuntime = sourceRuntime;
+        this.feedPolicy = feedPolicy;
+        policyAccessor = new FeedPolicyAccessor(feedPolicy);
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
+        this.feedManager = runtimeCtx.getFeedManager();
+    }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+        try {
+            outputRecordDescriptor = recordDesc;
+            FeedRuntimeType sourceRuntimeType = ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId())
+                    .getFeedRuntimeType();
+            switch (sourceRuntimeType) {
+                case INTAKE:
+                    handleCompleteConnection();
+                    break;
+                case COMPUTE:
+                    handlePartialConnection();
+                    break;
+                default:
+                    throw new IllegalStateException("Invalid source type " + sourceRuntimeType);
+            }
+
+            State state = collectRuntime.waitTillCollectionOver();
+            if (state.equals(State.FINISHED)) {
+                feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
+                        collectRuntime.getRuntimeId());
+                writer.close();
+                inputSideHandler.close();
+            } else if (state.equals(State.HANDOVER)) {
+                inputSideHandler.setMode(Mode.STALL);
+                writer.close();
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Ending Collect Operator, the input side handler is now in " + Mode.STALL
+                            + " and the output writer " + writer + " has been closed ");
+                }
+            }
+        } catch (InterruptedException ie) {
+            handleInterruptedException(ie);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void handleCompleteConnection() throws Exception {
+        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COLLECT, partition,
+                FeedRuntimeId.DEFAULT_OPERAND_ID);
+        collectRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(connectionId,
+                runtimeId);
+        if (collectRuntime == null) {
+            beginNewFeed(runtimeId);
+        } else {
+            reviveOldFeed();
+        }
+    }
+
+    private void beginNewFeed(FeedRuntimeId runtimeId) throws Exception {
+        writer.open();
+        IFrameWriter outputSideWriter = writer;
+        if (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType().equals(
+                FeedRuntimeType.COMPUTE)) {
+            outputSideWriter = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime, outputRecordDescriptor,
+                    connectionId);
+            this.recordDesc = sourceRuntime.getRecordDescriptor();
+        }
+
+        FrameTupleAccessor tupleAccessor = new FrameTupleAccessor(recordDesc);
+        inputSideHandler = new FeedCollectRuntimeInputHandler(ctx, connectionId, runtimeId, outputSideWriter, policyAccessor,
+                false,  tupleAccessor, recordDesc,
+                feedManager, nPartitions);
+
+        collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, outputSideWriter,
+                sourceRuntime, feedPolicy);
+        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+        sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
+    }
+
+    private void reviveOldFeed() throws HyracksDataException {
+        writer.open();
+        collectRuntime.getFrameCollector().setState(State.ACTIVE);
+        inputSideHandler = collectRuntime.getInputHandler();
+
+        IFrameWriter innerWriter = inputSideHandler.getCoreOperator();
+        if (innerWriter instanceof CollectTransformFeedFrameWriter) {
+            ((CollectTransformFeedFrameWriter) innerWriter).reset(this.writer);
+        } else {
+            inputSideHandler.setCoreOperator(writer);
+        }
+
+        inputSideHandler.setMode(Mode.PROCESS);
+    }
+
+    private void handlePartialConnection() throws Exception {
+        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COMPUTE_COLLECT, partition,
+                FeedRuntimeId.DEFAULT_OPERAND_ID);
+        writer.open();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Beginning new feed (from existing partial connection):" + connectionId);
+        }
+        IFeedOperatorOutputSideHandler wrapper = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime,
+                outputRecordDescriptor, connectionId);
+
+        inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, wrapper, policyAccessor, false,
+                 new FrameTupleAccessor(recordDesc), recordDesc, feedManager,
+                nPartitions);
+
+        collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, wrapper, sourceRuntime,
+                feedPolicy);
+        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+        recordDesc = sourceRuntime.getRecordDescriptor();
+        sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
+    }
+
+    private void handleInterruptedException(InterruptedException ie) throws HyracksDataException {
+        if (policyAccessor.continueOnHardwareFailure()) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Continuing on failure as per feed policy, switching to " + Mode.STALL
+                        + " until failure is resolved");
+            }
+            inputSideHandler.setMode(Mode.STALL);
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Failure during feed ingestion. Deregistering feed runtime " + collectRuntime
+                        + " as feed is not configured to handle failures");
+            }
+            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
+            writer.close();
+            throw new HyracksDataException(ie);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedConnectionManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedConnectionManager.java
new file mode 100644
index 0000000..eefb576
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedConnectionManager.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntime;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedConnectionManager;
+
+/**
+ * An implementation of the IFeedManager interface.
+ * Provider necessary central repository for registering/retrieving
+ * artifacts/services associated with a feed.
+ */
+public class FeedConnectionManager implements IFeedConnectionManager {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedConnectionManager.class.getName());
+
+    private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
+    private final String nodeId;
+
+    public FeedConnectionManager(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
+        return feedRuntimeManagers.get(feedId);
+    }
+
+    @Override
+    public void deregisterFeed(FeedConnectionId feedId) {
+        try {
+            FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
+            if (mgr != null) {
+                mgr.close();
+                feedRuntimeManagers.remove(feedId);
+            }
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
+            }
+        }
+
+    }
+
+    @Override
+    public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime)
+            throws Exception {
+        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+        if (runtimeMgr == null) {
+            runtimeMgr = new FeedRuntimeManager(connectionId, this);
+            feedRuntimeManagers.put(connectionId, runtimeMgr);
+        }
+        runtimeMgr.registerFeedRuntime(feedRuntime.getRuntimeId(), feedRuntime);
+    }
+
+    @Override
+    public void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
+        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+        if (runtimeMgr != null) {
+            runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
+        }
+    }
+
+    @Override
+    public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
+        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+        return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
+    }
+
+    @Override
+    public String toString() {
+        return "FeedManager " + "[" + nodeId + "]";
+    }
+
+    @Override
+    public List<FeedRuntimeId> getRegisteredRuntimes() {
+        List<FeedRuntimeId> runtimes = new ArrayList<FeedRuntimeId>();
+        for (Entry<FeedConnectionId, FeedRuntimeManager> entry : feedRuntimeManagers.entrySet()) {
+            runtimes.addAll(entry.getValue().getFeedRuntimes());
+        }
+        return runtimes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameTupleDecorator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameTupleDecorator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameTupleDecorator.java
new file mode 100644
index 0000000..d131b95
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameTupleDecorator.java
@@ -0,0 +1,90 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedConstants.StatisticsConstants;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class FeedFrameTupleDecorator {
+
+    private AMutableString aString = new AMutableString("");
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AtomicInteger tupleId;
+
+    @SuppressWarnings("unchecked")
+    private static ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ASTRING);
+    @SuppressWarnings("unchecked")
+    private static ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+    @SuppressWarnings("unchecked")
+    private static ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+
+    private final int partition;
+    private final ArrayBackedValueStorage attrNameStorage;
+    private final ArrayBackedValueStorage attrValueStorage;
+
+    public FeedFrameTupleDecorator(int partition) {
+        this.tupleId = new AtomicInteger(0);
+        this.partition = partition;
+        this.attrNameStorage = new ArrayBackedValueStorage();
+        this.attrValueStorage = new ArrayBackedValueStorage();
+    }
+
+    public void addLongAttribute(String attrName, long attrValue, IARecordBuilder recordBuilder)
+            throws HyracksDataException, AsterixException {
+        attrNameStorage.reset();
+        aString.setValue(attrName);
+        stringSerde.serialize(aString, attrNameStorage.getDataOutput());
+
+        attrValueStorage.reset();
+        aInt64.setValue(attrValue);
+        int64Serde.serialize(aInt64, attrValueStorage.getDataOutput());
+
+        recordBuilder.addField(attrNameStorage, attrValueStorage);
+    }
+
+    public void addIntegerAttribute(String attrName, int attrValue, IARecordBuilder recordBuilder)
+            throws HyracksDataException, AsterixException {
+        attrNameStorage.reset();
+        aString.setValue(attrName);
+        stringSerde.serialize(aString, attrNameStorage.getDataOutput());
+
+        attrValueStorage.reset();
+        aInt32.setValue(attrValue);
+        int32Serde.serialize(aInt32, attrValueStorage.getDataOutput());
+
+        recordBuilder.addField(attrNameStorage, attrValueStorage);
+    }
+
+    public void addTupleId(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+        addIntegerAttribute(StatisticsConstants.INTAKE_TUPLEID, tupleId.incrementAndGet(), recordBuilder);
+    }
+
+    public void addIntakePartition(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+        addIntegerAttribute(StatisticsConstants.INTAKE_PARTITION, partition, recordBuilder);
+    }
+
+    public void addIntakeTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+        addLongAttribute(StatisticsConstants.INTAKE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
+    }
+
+    public void addStoreTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+        addLongAttribute(StatisticsConstants.STORE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
deleted file mode 100644
index 899da77..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedMessageService;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager.FeedReportMessageType;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * A wrapper around the standard frame writer provided to an operator node pushable.
- * The wrapper monitors the flow of data from this operator to a downstream operator
- * over a connector. It collects statistics if required by the feed ingestion policy
- * and reports them to the Super Feed Manager chosen for the feed. In addition any
- * congestion experienced by the operator is also reported.
- */
-public class FeedFrameWriter implements IFrameWriter {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedFrameWriter.class.getName());
-
-    /** The threshold for the time required in pushing a frame to the network. **/
-    public static final long FLUSH_THRESHOLD_TIME = 5000; // 5 seconds
-
-    /** Actual frame writer provided to an operator. **/
-    private IFrameWriter writer;
-
-    /** The node pushable associated with the operator **/
-    private IOperatorNodePushable nodePushable;
-
-    /** set to true if health need to be monitored **/
-    private final boolean reportHealth;
-
-    /** A buffer for keeping frames that are waiting to be processed **/
-    private List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
-
-    /**
-     * Mode associated with the frame writer
-     * Possible values: FORWARD, STORE
-     * 
-     * @see Mode
-     */
-    private Mode mode;
-
-    /**
-     * Detects if the operator is unable to push a frame downstream
-     * within a threshold period of time. In addition, it measure the
-     * throughput as observed on the output channel of the associated operator.
-     */
-    private HealthMonitor healthMonitor;
-
-    /**
-     * A Timer instance for managing scheduling of tasks.
-     */
-    private Timer timer;
-
-    /**
-     * Provides access to the tuples in a frame. Used in collecting statistics
-     */
-    private FrameTupleAccessor fta;
-
-    public enum Mode {
-        /**
-         * **
-         * Normal mode of operation for an operator when
-         * frames are pushed to the downstream operator.
-         */
-        FORWARD,
-
-        /**
-         * Failure mode of operation for an operator when
-         * input frames are not pushed to the downstream operator but
-         * are buffered for future retrieval. This mode is adopted
-         * during failure recovery.
-         */
-        STORE
-    }
-
-    public FeedFrameWriter(IFrameWriter writer, IOperatorNodePushable nodePushable, FeedConnectionId feedId,
-            FeedPolicyEnforcer policyEnforcer, String nodeId, FeedRuntimeType feedRuntimeType, int partition,
-            FrameTupleAccessor fta, IFeedManager feedManager) {
-        this.writer = writer;
-        this.mode = Mode.FORWARD;
-        this.nodePushable = nodePushable;
-        this.reportHealth = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
-        if (reportHealth) {
-            timer = new Timer();
-            healthMonitor = new HealthMonitor(feedId, nodeId, feedRuntimeType, partition, timer, fta, feedManager);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Statistics collection enabled for the feed " + feedId + " " + feedRuntimeType + " ["
-                        + partition + "]");
-            }
-            timer.scheduleAtFixedRate(healthMonitor, 0, FLUSH_THRESHOLD_TIME);
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Statistics collection *not* enabled for the feed " + feedId + " " + feedRuntimeType + " ["
-                        + partition + "]");
-            }
-        }
-        this.fta = fta;
-    }
-
-    public Mode getMode() {
-        return mode;
-    }
-
-    public void setMode(Mode newMode) throws HyracksDataException {
-        if (this.mode.equals(newMode)) {
-            return;
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Switching to :" + newMode + " from " + this.mode);
-        }
-        this.mode = newMode;
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        switch (mode) {
-            case FORWARD:
-                try {
-                    if (reportHealth) {
-                        fta.reset(buffer);
-                        healthMonitor.notifyStartFrameFlushActivity();
-                        writer.nextFrame(buffer);
-                        healthMonitor.notifyFinishFrameFlushActivity();
-                    } else {
-                        writer.nextFrame(buffer);
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Unable to write frame " + " on behalf of " + nodePushable.getDisplayName()
-                                + ":\n" + e);
-                    }
-                }
-                if (frames.size() > 0) {
-                    for (ByteBuffer buf : frames) {
-                        writer.nextFrame(buf);
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Flushed old frame (from previous failed execution) : " + buf
-                                    + " on behalf of " + nodePushable.getDisplayName());
-                        }
-                    }
-                    frames.clear();
-                }
-                break;
-            case STORE:
-
-                /* TODO:
-                 * Limit the in-memory space utilized during the STORE mode. The limit (expressed in bytes) 
-                 * is a parameter specified as part of the feed ingestion policy. Below is a basic implemenation
-                 * that allocates a buffer on demand.   
-                 * */
-
-                ByteBuffer storageBuffer = ByteBuffer.allocate(buffer.capacity());
-                storageBuffer.put(buffer);
-                frames.add(storageBuffer);
-                storageBuffer.flip();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Stored frame for " + nodePushable.getDisplayName());
-                }
-                break;
-        }
-    }
-
-    /**
-     * Detects if the operator is unable to push a frame downstream
-     * within a threshold period of time. In addition, it measure the
-     * throughput as observed on the output channel of the associated operator.
-     */
-    private static class HealthMonitor extends TimerTask {
-
-        private static final String EOL = "\n";
-
-        private long startTime = -1;
-        private FramePushState state;
-        private AtomicLong numTuplesInInterval = new AtomicLong(0);
-        private boolean collectThroughput;
-        private FeedMessageService mesgService;
-
-        private final FeedConnectionId feedId;
-        private final String nodeId;
-        private final FeedRuntimeType feedRuntimeType;
-        private final int partition;
-        private final long period;
-        private final FrameTupleAccessor fta;
-        private final IFeedManager feedManager;
-
-        public HealthMonitor(FeedConnectionId feedId, String nodeId, FeedRuntimeType feedRuntimeType, int partition,
-                Timer timer, FrameTupleAccessor fta, IFeedManager feedManager) {
-            this.state = FramePushState.INTIALIZED;
-            this.feedId = feedId;
-            this.nodeId = nodeId;
-            this.feedRuntimeType = feedRuntimeType;
-            this.partition = partition;
-            this.period = FLUSH_THRESHOLD_TIME;
-            this.collectThroughput = feedRuntimeType.equals(FeedRuntimeType.INGESTION);
-            this.fta = fta;
-            this.feedManager = feedManager;
-        }
-
-        public void notifyStartFrameFlushActivity() {
-            startTime = System.currentTimeMillis();
-            state = FramePushState.WAITING_FOR_FLUSH_COMPLETION;
-        }
-
-        /**
-         * Reset method is invoked when a live instance of operator needs to take
-         * over from the zombie instance from the previously failed execution
-         */
-        public void reset() {
-            mesgService = null;
-            collectThroughput = feedRuntimeType.equals(FeedRuntimeType.INGESTION);
-        }
-
-        public void notifyFinishFrameFlushActivity() {
-            state = FramePushState.WAITNG_FOR_NEXT_FRAME;
-            numTuplesInInterval.set(numTuplesInInterval.get() + fta.getTupleCount());
-        }
-
-        @Override
-        public void run() {
-            if (state.equals(FramePushState.WAITING_FOR_FLUSH_COMPLETION)) {
-                long currentTime = System.currentTimeMillis();
-                if (currentTime - startTime > FLUSH_THRESHOLD_TIME) {
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Congestion reported by " + feedRuntimeType + " [" + partition + "]");
-                    }
-                    sendReportToSuperFeedManager(currentTime - startTime, FeedReportMessageType.CONGESTION,
-                            System.currentTimeMillis());
-                }
-            }
-            if (collectThroughput) {
-                int instantTput = (int) Math.ceil((((double) numTuplesInInterval.get() * 1000) / period));
-                sendReportToSuperFeedManager(instantTput, FeedReportMessageType.THROUGHPUT, System.currentTimeMillis());
-            }
-            numTuplesInInterval.set(0);
-        }
-
-        private void sendReportToSuperFeedManager(long value, SuperFeedManager.FeedReportMessageType mesgType,
-                long timestamp) {
-            if (mesgService == null) {
-                waitTillMessageServiceIsUp();
-            }
-            String feedRep = feedId.getDataverse() + ":" + feedId.getFeedName() + ":" + feedId.getDatasetName();
-            String message = mesgType.name().toLowerCase() + FeedMessageService.MessageSeparator + feedRep
-                    + FeedMessageService.MessageSeparator + feedRuntimeType + FeedMessageService.MessageSeparator
-                    + partition + FeedMessageService.MessageSeparator + value + FeedMessageService.MessageSeparator
-                    + nodeId + FeedMessageService.MessageSeparator + timestamp + FeedMessageService.MessageSeparator
-                    + EOL;
-            try {
-                mesgService.sendMessage(message);
-            } catch (IOException ioe) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unable to send feed report to Super Feed Manager for feed " + feedId + " "
-                            + feedRuntimeType + "[" + partition + "]");
-                }
-            }
-        }
-
-        private void waitTillMessageServiceIsUp() {
-            while (mesgService == null) {
-                mesgService = feedManager.getFeedMessageService(feedId);
-                if (mesgService == null) {
-                    try {
-                        /**
-                         * wait for the message service to be available
-                         */
-                        Thread.sleep(2000);
-                    } catch (InterruptedException e) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Encountered an interrupted exception " + " Exception " + e);
-                        }
-                    }
-                }
-            }
-        }
-
-        public void deactivate() {
-            // cancel the timer task to avoid future execution. 
-            cancel();
-            collectThroughput = false;
-        }
-
-        private enum FramePushState {
-            /**
-             * Frame writer has been initialized
-             */
-            INTIALIZED,
-
-            /**
-             * Frame writer is waiting for a pending flush to finish.
-             */
-            WAITING_FOR_FLUSH_COMPLETION,
-
-            /**
-             * Frame writer is waiting to be given the next frame.
-             */
-            WAITNG_FOR_NEXT_FRAME
-        }
-
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        writer.fail();
-        if(healthMonitor != null) {
-            if (!healthMonitor.feedRuntimeType.equals(FeedRuntimeType.INGESTION)) {
-              healthMonitor.deactivate();
-            } else {
-              healthMonitor.reset();
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (healthMonitor != null) {
-            healthMonitor.deactivate();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Closing frame statistics collection activity" + healthMonitor);
-            }
-        }
-        writer.close();
-    }
-
-    public IFrameWriter getWriter() {
-        return writer;
-    }
-
-    public void setWriter(IFrameWriter writer) {
-        this.writer = writer;
-    }
-
-    @Override
-    public String toString() {
-        return "MaterializingFrameWriter using " + writer;
-    }
-
-    public List<ByteBuffer> getStoredFrames() {
-        return frames;
-    }
-
-    public void clear() {
-        frames.clear();
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        writer.open();
-    }
-
-    public void reset() {
-        healthMonitor.reset();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 2b05989..1282f85 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -15,194 +15,117 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.Map;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.IngestionRuntime;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
 import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 
 /**
- * FeedIntakeOperatorDescriptor is responsible for ingesting data from an external source. This
- * operator uses a user specified for a built-in adapter for retrieving data from the external
- * data source.
+ * An operator responsible for establishing connection with external data source and parsing,
+ * translating the received content.It uses an instance of feed adaptor to perform these functions.
  */
 public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
-
-    /** The type associated with the ADM data output from the feed adapter */
-    private final IAType outputType;
 
-    /** unique identifier for a feed instance. */
-    private final FeedConnectionId feedId;
+    private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
 
-    /** Map representation of policy parameters */
-    private final Map<String, String> feedPolicy;
+    /** The unique identifier of the feed that is being ingested. **/
+    private final FeedId feedId;
 
-    /** The adapter factory that is used to create an instance of the feed adapter **/
-    private IAdapterFactory adapterFactory;
+    private final FeedPolicyAccessor policyAccessor;
 
-    /** The (singleton) instance of IFeedManager **/
-    private IFeedManager feedManager;
+    /** The adaptor factory that is used to create an instance of the feed adaptor **/
+    private IFeedAdapterFactory adaptorFactory;
 
     /** The library that contains the adapter in use. **/
-    private String adapterLibraryName;
+    private String adaptorLibraryName;
 
     /**
      * The adapter factory class that is used to create an instance of the feed adapter.
      * This value is used only in the case of external adapters.
      **/
-    private String adapterFactoryClassName;
+    private String adaptorFactoryClassName;
 
     /** The configuration parameters associated with the adapter. **/
-    private Map<String, String> adapterConfiguration;
+    private Map<String, String> adaptorConfiguration;
 
     private ARecordType adapterOutputType;
 
-    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, IAdapterFactory adapterFactory,
-            ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicy) {
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, PrimaryFeed primaryFeed,
+            IFeedAdapterFactory adapterFactory, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
         super(spec, 0, 1);
-        recordDescriptors[0] = rDesc;
-        this.adapterFactory = adapterFactory;
-        this.outputType = atype;
-        this.feedId = feedId;
-        this.feedPolicy = feedPolicy;
+        this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+        this.adaptorFactory = adapterFactory;
+        this.adapterOutputType = adapterOutputType;
+        this.policyAccessor = policyAccessor;
     }
 
-    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, String adapterLibraryName,
-            String adapterFactoryClassName, Map<String, String> configuration, ARecordType atype,
-            RecordDescriptor rDesc, Map<String, String> feedPolicy) {
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, PrimaryFeed primaryFeed, String adapterLibraryName,
+            String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
         super(spec, 0, 1);
-        recordDescriptors[0] = rDesc;
-        this.adapterFactoryClassName = adapterFactoryClassName;
-        this.adapterConfiguration = configuration;
-        this.adapterLibraryName = adapterLibraryName;
-        this.outputType = atype;
-        this.feedId = feedId;
-        this.feedPolicy = feedPolicy;
+        this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+        this.adaptorFactoryClassName = adapterFactoryClassName;
+        this.adaptorLibraryName = adapterLibraryName;
+        this.adaptorConfiguration = primaryFeed.getAdaptorConfiguration();
+        this.adapterOutputType = adapterOutputType;
+        this.policyAccessor = policyAccessor;
     }
 
+    @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-            throws HyracksDataException {
-        IFeedAdapter adapter = null;
-        FeedRuntimeId feedRuntimeId = new FeedRuntimeId(feedId, FeedRuntimeType.INGESTION, partition);
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
-        this.feedManager = runtimeCtx.getFeedManager();
-        IngestionRuntime ingestionRuntime = (IngestionRuntime) feedManager.getFeedRuntime(feedRuntimeId);
-        try {
-            if (ingestionRuntime == null) {
-                // create an instance of a feed adapter to ingest data.
-                adapter = createAdapter(ctx, partition);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Beginning new feed:" + feedId);
-                }
-            } else {
-                // retrieve the instance of the feed adapter used in previous failed execution.
-                adapter = ((IngestionRuntime) ingestionRuntime).getAdapterRuntimeManager().getFeedAdapter();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Resuming old feed:" + feedId);
-                }
+        IFeedSubscriptionManager feedSubscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
+        SubscribableFeedRuntimeId feedIngestionId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
+                partition);
+        IngestionRuntime ingestionRuntime = (IngestionRuntime) feedSubscriptionManager
+                .getSubscribableRuntime(feedIngestionId);
+        if (adaptorFactory == null) {
+            try {
+                adaptorFactory = createExtenralAdapterFactory(ctx, partition);
+            } catch (Exception exception) {
+                throw new HyracksDataException(exception);
             }
-        } catch (Exception exception) {
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.severe("Initialization of the feed adapter failed with exception " + exception);
-            }
-            throw new HyracksDataException("Initialization of the feed adapter failed", exception);
-        }
-        return new FeedIntakeOperatorNodePushable(ctx, feedId, adapter, feedPolicy, partition, ingestionRuntime);
-    }
-
-    public FeedConnectionId getFeedId() {
-        return feedId;
-    }
-
-    public Map<String, String> getFeedPolicy() {
-        return feedPolicy;
-    }
-
-    public IAdapterFactory getAdapterFactory() {
-        return adapterFactory;
-    }
-
-    public IAType getOutputType() {
-        return outputType;
-    }
 
-    public RecordDescriptor getRecordDescriptor() {
-        return recordDescriptors[0];
+        }
+        return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, ingestionRuntime,
+                policyAccessor);
     }
 
-    private IFeedAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        IFeedAdapter feedAdapter = null;
-        if (adapterFactory != null) {
-            feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+    private IFeedAdapterFactory createExtenralAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception {
+        IFeedAdapterFactory adapterFactory = null;
+        ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
+                adaptorLibraryName);
+        if (classLoader != null) {
+            adapterFactory = ((IFeedAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance()));
+            adapterFactory.configure(adaptorConfiguration, adapterOutputType);
         } else {
-            ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
-                    adapterLibraryName);
-            if (classLoader != null) {
-                IAdapterFactory adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adapterFactoryClassName)
-                        .newInstance()));
-
-                switch (adapterFactory.getAdapterType()) {
-                    case TYPED: {
-                        ((ITypedAdapterFactory) adapterFactory).configure(adapterConfiguration);
-                        feedAdapter = (IFeedAdapter) ((ITypedAdapterFactory) adapterFactory).createAdapter(ctx,
-                                partition);
-                    }
-                        break;
-                    case GENERIC: {
-                        String outputTypeName = adapterConfiguration.get(IGenericAdapterFactory.KEY_TYPE_NAME);
-                        if (outputTypeName == null) {
-                            throw new IllegalArgumentException(
-                                    "You must specify the datatype associated with the incoming data. Datatype is specified by the "
-                                            + IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
-                        }
-                        ((IGenericAdapterFactory) adapterFactory).configure(adapterConfiguration,
-                                (ARecordType) adapterOutputType);
-                        ((IGenericAdapterFactory) adapterFactory).createAdapter(ctx, partition);
-                    }
-                        break;
-                }
-
-                feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
-            } else {
-                String message = "Unable to create adapter as class loader not configured for library "
-                        + adapterLibraryName + " in dataverse " + feedId.getDataverse();
-                if (LOGGER.isLoggable(Level.SEVERE)) {
-                    LOGGER.severe(message);
-                }
-                throw new IllegalArgumentException(message);
-
-            }
+            String message = "Unable to create adapter as class loader not configured for library "
+                    + adaptorLibraryName + " in dataverse " + feedId.getDataverse();
+            LOGGER.severe(message);
+            throw new IllegalArgumentException(message);
         }
-        return feedAdapter;
-    }
-
-    public String getAdapterLibraryName() {
-        return adapterLibraryName;
+        return adapterFactory;
     }
 
-    public String getAdapterFactoryClassName() {
-        return adapterFactoryClassName;
+    public FeedId getFeedId() {
+        return feedId;
     }
 
-    public Map<String, String> getAdapterConfiguration() {
-        return adapterConfiguration;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index a6b97eb..3256420 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -14,133 +14,196 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.FeedRuntimeManager;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter.DataExchangeMode;
+import edu.uci.ics.asterix.common.feeds.CollectionRuntime;
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.IngestionRuntime;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager.State;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.common.feeds.api.ISubscriberRuntime;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 /**
- * The runtime for @see{FeedIntakeOperationDescriptor}
+ * The runtime for @see{FeedIntakeOperationDescriptor}.
+ * Provides the core functionality to set up the artifacts for ingestion of a feed.
+ * The artifacts are lazily activated when a feed receives a subscription request.
  */
 public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
 
     private static Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
 
+    private final FeedId feedId;
     private final int partition;
-    private final FeedConnectionId feedId;
-    private final LinkedBlockingQueue<IFeedMessage> inbox;
-    private final Map<String, String> feedPolicy;
-    private final FeedPolicyEnforcer policyEnforcer;
-    private final String nodeId;
-    private final FrameTupleAccessor fta;
+    private final IFeedSubscriptionManager feedSubscriptionManager;
     private final IFeedManager feedManager;
+    private final IHyracksTaskContext ctx;
+    private final IFeedAdapterFactory adapterFactory;
+    private final FeedPolicyAccessor policyAccessor;
 
-    private FeedRuntime ingestionRuntime;
+    private IngestionRuntime ingestionRuntime;
     private IFeedAdapter adapter;
-    private FeedFrameWriter feedFrameWriter;
+    private IIntakeProgressTracker tracker;
+    private DistributeFeedFrameWriter feedFrameWriter;
 
-    public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedId, IFeedAdapter adapter,
-            Map<String, String> feedPolicy, int partition, IngestionRuntime ingestionRuntime) {
-        this.adapter = adapter;
-        this.partition = partition;
+    public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IFeedAdapterFactory adapterFactory,
+            int partition, IngestionRuntime ingestionRuntime, FeedPolicyAccessor policyAccessor) {
+        this.ctx = ctx;
         this.feedId = feedId;
+        this.partition = partition;
         this.ingestionRuntime = ingestionRuntime;
-        inbox = new LinkedBlockingQueue<IFeedMessage>();
-        this.feedPolicy = feedPolicy;
-        policyEnforcer = new FeedPolicyEnforcer(feedId, feedPolicy);
-        nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
-        fta = new FrameTupleAccessor(recordDesc);
+        this.adapterFactory = adapterFactory;
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
+        this.feedSubscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
         this.feedManager = runtimeCtx.getFeedManager();
+        this.policyAccessor = policyAccessor;
     }
 
     @Override
     public void initialize() throws HyracksDataException {
-
-        AdapterRuntimeManager adapterRuntimeMgr = null;
+        IAdapterRuntimeManager adapterRuntimeManager = null;
         try {
             if (ingestionRuntime == null) {
-                feedFrameWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
-                        FeedRuntimeType.INGESTION, partition, fta, feedManager);
-                adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, feedFrameWriter, partition, inbox,
-                        feedManager);
-
-                if (adapter.getDataExchangeMode().equals(DataExchangeMode.PULL)
-                        && adapter instanceof IPullBasedFeedAdapter) {
-                    ((IPullBasedFeedAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
-                }
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Beginning new feed:" + feedId);
+                try {
+                    adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+                    if (adapterFactory.isRecordTrackingEnabled()) {
+                        tracker = adapterFactory.createIntakeProgressTracker();
+                    }
+                } catch (Exception e) {
+                    LOGGER.severe("Unable to create adapter : " + adapterFactory.getName() + "[" + partition + "]"
+                            + " Exception " + e);
+                    throw new HyracksDataException(e);
                 }
+                FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc);
+                feedFrameWriter = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition, fta,
+                        feedManager);
+                adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, tracker, feedFrameWriter, partition);
+                SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
+                        partition);
+                ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
+                        adapterRuntimeManager);
+                feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
                 feedFrameWriter.open();
-                adapterRuntimeMgr.start();
             } else {
-                adapterRuntimeMgr = ((IngestionRuntime) ingestionRuntime).getAdapterRuntimeManager();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Resuming old feed:" + feedId);
+                if (ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
+                    ingestionRuntime.getAdapterRuntimeManager().setState(State.ACTIVE_INGESTION);
+                    adapter = ingestionRuntime.getAdapterRuntimeManager().getFeedAdapter();
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info(" Switching to " + State.ACTIVE_INGESTION + " for ingestion runtime "
+                                + ingestionRuntime);
+                        LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
+                                + " connected to backend for feed " + feedId);
+                    }
+                    feedFrameWriter = (DistributeFeedFrameWriter) ingestionRuntime.getFeedFrameWriter();
+                } else {
+                    String message = "Feed Ingestion Runtime for feed " + feedId
+                            + " is already registered and is active!.";
+                    LOGGER.severe(message);
+                    throw new IllegalStateException(message);
                 }
-                adapter = adapterRuntimeMgr.getFeedAdapter();
-                writer.open();
-                adapterRuntimeMgr.getAdapterExecutor().setWriter(writer);
-                adapterRuntimeMgr.getAdapterExecutor().getWriter().reset();
-                adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
-                feedFrameWriter = adapterRuntimeMgr.getAdapterExecutor().getWriter();
             }
 
-            ingestionRuntime = adapterRuntimeMgr.getIngestionRuntime();
-            synchronized (adapterRuntimeMgr) {
-                while (!adapterRuntimeMgr.getState().equals(State.FINISHED_INGESTION)) {
-                    adapterRuntimeMgr.wait();
-                }
+            waitTillIngestionIsOver(adapterRuntimeManager);
+            feedSubscriptionManager.deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime
+                    .getRuntimeId());
+            if (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)) {
+                throw new HyracksDataException("Unable to ingest data");
             }
-            feedManager.deRegisterFeedRuntime(ingestionRuntime.getFeedRuntimeId());
-            feedFrameWriter.close();
+
         } catch (InterruptedException ie) {
-            if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Continuing on failure as per feed policy, switching to INACTIVE INGESTION temporarily");
+            /*
+             * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.  
+             * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
+             * The surviving intake partitions must continue to live and receive data from the external source. 
+             */
+            List<ISubscriberRuntime> subscribers = ingestionRuntime.getSubscribers();
+            FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(new HashMap<String, String>());
+            boolean needToHandleFailure = false;
+            List<ISubscriberRuntime> failingSubscribers = new ArrayList<ISubscriberRuntime>();
+            for (ISubscriberRuntime subscriber : subscribers) {
+                policyAccessor.reset(subscriber.getFeedPolicy());
+                if (!policyAccessor.continueOnHardwareFailure()) {
+                    failingSubscribers.add(subscriber);
+                } else {
+                    needToHandleFailure = true;
                 }
-                adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
-                FeedRuntimeManager runtimeMgr = feedManager.getFeedRuntimeManager(feedId);
+            }
+
+            for (ISubscriberRuntime failingSubscriber : failingSubscribers) {
                 try {
-                    runtimeMgr.close(false);
-                } catch (IOException ioe) {
+                    ingestionRuntime.unsubscribeFeed((CollectionRuntime) failingSubscriber);
+                } catch (Exception e) {
                     if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Unable to close Feed Runtime Manager " + ioe.getMessage());
+                        LOGGER.warning("Excpetion in unsubscribing " + failingSubscriber + " message " + e.getMessage());
                     }
                 }
-                feedFrameWriter.fail();
+            }
+
+            if (needToHandleFailure) {
+                ingestionRuntime.getAdapterRuntimeManager().setState(State.INACTIVE_INGESTION);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Switching to " + State.INACTIVE_INGESTION + " on occurrence of failure.");
+                }
             } else {
                 if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Interrupted Exception, something went wrong");
+                    LOGGER.info("Interrupted Exception. None of the subscribers need to handle failures. Shutting down feed ingestion");
                 }
-
-                feedManager.deRegisterFeedRuntime(ingestionRuntime.getFeedRuntimeId());
-                feedFrameWriter.close();
+                feedSubscriptionManager.deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime
+                        .getRuntimeId());
                 throw new HyracksDataException(ie);
             }
         } catch (Exception e) {
             e.printStackTrace();
             throw new HyracksDataException(e);
+        } finally {
+            if (ingestionRuntime != null
+                    && !ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
+                feedFrameWriter.close();
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Closed Frame Writer " + feedFrameWriter + " adapter state "
+                            + ingestionRuntime.getAdapterRuntimeManager().getState());
+                }
+            } else {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Ending intake operator node pushable in state " + State.INACTIVE_INGESTION
+                            + " Will resume after correcting failure");
+                }
+            }
+
         }
     }
 
-    public Map<String, String> getFeedPolicy() {
-        return feedPolicy;
+    private void waitTillIngestionIsOver(IAdapterRuntimeManager adapterRuntimeManager) throws InterruptedException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Waiting for adaptor [" + partition + "]" + "to be done with ingestion of feed " + feedId);
+        }
+        synchronized (adapterRuntimeManager) {
+            while (!(adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FINISHED_INGESTION) || (adapterRuntimeManager
+                    .getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)))) {
+                adapterRuntimeManager.wait();
+            }
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
+                    + " done with ingestion of feed " + feedId);
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
new file mode 100644
index 0000000..098d713
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
+
+public class FeedLifecycleEventSubscriber implements IFeedLifecycleEventSubscriber {
+
+    private LinkedBlockingQueue<FeedLifecycleEvent> inbox;
+
+    public FeedLifecycleEventSubscriber() {
+        this.inbox = new LinkedBlockingQueue<FeedLifecycleEvent>();
+    }
+
+    @Override
+    public void handleFeedEvent(FeedLifecycleEvent event) {
+        inbox.add(event);
+    }
+
+    @Override
+    public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException {
+        boolean eventOccurred = false;
+        FeedLifecycleEvent e = null;
+        Iterator<FeedLifecycleEvent> eventsSoFar = inbox.iterator();
+        while (eventsSoFar.hasNext()) {
+            e = eventsSoFar.next();
+            assertNoFailure(e);
+            eventOccurred = e.equals(event);
+        }
+
+        while (!eventOccurred) {
+            e = inbox.take();
+            eventOccurred = e.equals(event);
+            if (!eventOccurred) {
+                assertNoFailure(e);
+            }
+        }
+    }
+
+    private void assertNoFailure(FeedLifecycleEvent e) throws AsterixException {
+        if (e.equals(FeedLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(FeedLifecycleEvent.FEED_COLLECT_FAILURE)) {
+            throw new AsterixException("Failure in feed");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
deleted file mode 100644
index 8b92994..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedMessageService;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntimeManager;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager;
-
-/**
- * An implementation of the IFeedManager interface.
- * Provider necessary central repository for registering/retrieving
- * artifacts/services associated with a feed.
- */
-public class FeedManager implements IFeedManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
-
-    private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
-    private final String nodeId;
-
-    public FeedManager(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
-        return feedRuntimeManagers.get(feedId);
-    }
-
-    public ExecutorService getFeedExecutorService(FeedConnectionId feedId) {
-        FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
-        return mgr == null ? null : mgr.getExecutorService();
-    }
-
-    @Override
-    public FeedMessageService getFeedMessageService(FeedConnectionId feedId) {
-        FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
-        return mgr == null ? null : mgr.getMessageService();
-    }
-
-    @Override
-    public void deregisterFeed(FeedConnectionId feedId) {
-        try {
-            FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
-            if (mgr == null) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unknown feed id: " + feedId);
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Closing feed runtime manager: " + mgr);
-                }
-                mgr.close(true);
-            }
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
-            }
-            e.printStackTrace();
-        }
-
-        feedRuntimeManagers.remove(feedId);
-    }
-
-    @Override
-    public void registerFeedRuntime(FeedRuntime feedRuntime) throws Exception {
-        FeedConnectionId feedId = feedRuntime.getFeedRuntimeId().getFeedId();
-        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
-        if (runtimeMgr == null) {
-            synchronized (feedRuntimeManagers) {
-                if (runtimeMgr == null) {
-                    runtimeMgr = new FeedRuntimeManager(feedId, this);
-                    feedRuntimeManagers.put(feedId, runtimeMgr);
-                }
-            }
-        }
-
-        runtimeMgr.registerFeedRuntime(feedRuntime.getFeedRuntimeId(), feedRuntime);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Registered runtime " + feedRuntime + " for feed " + feedId);
-        }
-    }
-
-    @Override
-    public void deRegisterFeedRuntime(FeedRuntimeId feedRuntimeId) {
-        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedRuntimeId.getFeedId());
-        if (runtimeMgr != null) {
-            runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Deregistered Feed Runtime " + feedRuntimeId);
-            }
-        }
-    }
-
-    @Override
-    public FeedRuntime getFeedRuntime(FeedRuntimeId feedRuntimeId) {
-        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedRuntimeId.getFeedId());
-        return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
-    }
-
-    @Override
-    public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) throws Exception {
-        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
-        if (runtimeMgr != null) {
-            runtimeMgr.setSuperFeedManager(sfm);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Registered Super Feed Manager " + sfm);
-            }
-        }
-    }
-
-    @Override
-    public SuperFeedManager getSuperFeedManager(FeedConnectionId feedId) {
-        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
-        return runtimeMgr != null ? runtimeMgr.getSuperFeedManager() : null;
-    }
-
-    @Override
-    public String toString() {
-        return "FeedManager " + "[" + nodeId + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
index 9b00322..ca50c83 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
 import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -29,20 +30,20 @@ public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperato
 
     private static final long serialVersionUID = 1L;
 
-    private final FeedConnectionId feedId;
+    private final FeedConnectionId connectionId;
     private final IFeedMessage feedMessage;
 
-    public FeedMessageOperatorDescriptor(JobSpecification spec, String dataverse, String feedName, String dataset,
+    public FeedMessageOperatorDescriptor(JobSpecification spec, FeedConnectionId connectionId,
             IFeedMessage feedMessage) {
         super(spec, 0, 1);
-        this.feedId = new FeedConnectionId(dataverse, feedName, dataset);
+        this.connectionId = connectionId;
         this.feedMessage = feedMessage;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new FeedMessageOperatorNodePushable(ctx, feedId, feedMessage, partition, nPartitions);
+        return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition, nPartitions);
     }
 
 }