You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:04 UTC

[10/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
new file mode 100644
index 0000000..3cb5d64
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorNodePushable.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.dataflow.FeedCollectRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedRuntimeManager;
+import org.apache.asterix.external.feed.message.EndFeedMessage;
+import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
+import org.apache.asterix.external.feed.message.PrepareStallMessage;
+import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
+import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.feed.runtime.CollectionRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.external.feed.watch.IntakePartitionStatistics;
+import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
+import org.apache.asterix.external.feed.watch.StorageSideMonitoredBuffer;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ * Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating
+ * a feed message to the local feed manager on the host node controller.
+ * @see FeedMessageOperatorDescriptor
+ *      IFeedMessage
+ *      IFeedManager
+ */
+public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
+
+    private final FeedConnectionId connectionId;
+    private final IFeedMessage message;
+    private final IFeedManager feedManager;
+    private final int partition;
+
+    public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId,
+            IFeedMessage feedMessage, int partition, int nPartitions) {
+        this.connectionId = connectionId;
+        this.message = feedMessage;
+        this.partition = partition;
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
+        this.feedManager = (IFeedManager) runtimeCtx.getFeedManager();
+    }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+        try {
+            writer.open();
+            switch (message.getMessageType()) {
+                case END:
+                    EndFeedMessage endFeedMessage = (EndFeedMessage) message;
+                    switch (endFeedMessage.getEndMessageType()) {
+                        case DISCONNECT_FEED:
+                            hanldeDisconnectFeedTypeMessage(endFeedMessage);
+                            break;
+                        case DISCONTINUE_SOURCE:
+                            handleDiscontinueFeedTypeMessage(endFeedMessage);
+                            break;
+                    }
+                    break;
+                case PREPARE_STALL: {
+                    handlePrepareStallMessage((PrepareStallMessage) message);
+                    break;
+                }
+                case TERMINATE_FLOW: {
+                    FeedConnectionId connectionId = ((TerminateDataFlowMessage) message).getConnectionId();
+                    handleTerminateFlowMessage(connectionId);
+                    break;
+                }
+                case COMMIT_ACK_RESPONSE: {
+                    handleFeedTupleCommitResponseMessage((FeedTupleCommitResponseMessage) message);
+                    break;
+                }
+                case THROTTLING_ENABLED: {
+                    handleThrottlingEnabledMessage((ThrottlingEnabledFeedMessage) message);
+                    break;
+                }
+                default:
+                    break;
+
+            }
+
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        } finally {
+            writer.close();
+        }
+    }
+
+    private void handleThrottlingEnabledMessage(ThrottlingEnabledFeedMessage throttlingMessage) {
+        FeedConnectionId connectionId = throttlingMessage.getConnectionId();
+        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+        Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
+        for (FeedRuntimeId runtimeId : runtimes) {
+            if (runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.STORE)) {
+                FeedRuntime storeRuntime = runtimeManager.getFeedRuntime(runtimeId);
+                ((StorageSideMonitoredBuffer) (storeRuntime.getInputHandler().getmBuffer())).setAcking(false);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Acking Disabled in view of throttling that has been activted upfron in the pipeline "
+                            + connectionId);
+                }
+            }
+        }
+    }
+
+    private void handleFeedTupleCommitResponseMessage(FeedTupleCommitResponseMessage commitResponseMessage) {
+        FeedConnectionId connectionId = commitResponseMessage.getConnectionId();
+        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+        Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
+        for (FeedRuntimeId runtimeId : runtimes) {
+            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+            switch (runtimeId.getFeedRuntimeType()) {
+                case COLLECT:
+                    FeedCollectRuntimeInputHandler inputHandler = (FeedCollectRuntimeInputHandler) runtime
+                            .getInputHandler();
+                    int maxBasePersisted = commitResponseMessage.getMaxWindowAcked();
+                    inputHandler.dropTill(IntakePartitionStatistics.ACK_WINDOW_SIZE * (maxBasePersisted + 1));
+                    break;
+                case STORE:
+                    MonitoredBufferStorageTimerTask sTask = runtime.getInputHandler().getmBuffer()
+                            .getStorageTimeTrackingRateTask();
+                    sTask.receiveCommitAckResponse(commitResponseMessage);
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        commitResponseMessage.getIntakePartition();
+        SubscribableFeedRuntimeId sid = new SubscribableFeedRuntimeId(connectionId.getFeedId(), FeedRuntimeType.INTAKE,
+                partition);
+        IngestionRuntime ingestionRuntime = (IngestionRuntime) feedManager.getFeedSubscriptionManager()
+                .getSubscribableRuntime(sid);
+        if (ingestionRuntime != null) {
+            IIntakeProgressTracker tracker = ingestionRuntime.getAdapterRuntimeManager().getProgressTracker();
+            if (tracker != null) {
+                tracker.notifyIngestedTupleTimestamp(System.currentTimeMillis());
+            }
+        }
+    }
+
+    private void handleTerminateFlowMessage(FeedConnectionId connectionId) throws HyracksDataException {
+        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+        Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
+
+        boolean found = false;
+        for (FeedRuntimeId runtimeId : feedRuntimes) {
+            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+            if (runtime.getRuntimeId().getRuntimeType().equals(FeedRuntimeType.COLLECT)) {
+                ((CollectionRuntime) runtime).getFrameCollector().setState(State.HANDOVER);
+                found = true;
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Switched " + runtime + " to Hand Over stage");
+                }
+            }
+        }
+        if (!found) {
+            throw new HyracksDataException("COLLECT Runtime  not found!");
+        }
+    }
+
+    private void handlePrepareStallMessage(PrepareStallMessage prepareStallMessage) throws HyracksDataException {
+        FeedConnectionId connectionId = prepareStallMessage.getConnectionId();
+        int computePartitionsRetainLimit = prepareStallMessage.getComputePartitionsRetainLimit();
+        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
+        Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
+        for (FeedRuntimeId runtimeId : feedRuntimes) {
+            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
+            switch (runtimeId.getFeedRuntimeType()) {
+                case COMPUTE:
+                    Mode requiredMode = runtimeId.getPartition() <= computePartitionsRetainLimit ? Mode.STALL
+                            : Mode.END;
+                    runtime.setMode(requiredMode);
+                    break;
+                default:
+                    runtime.setMode(Mode.STALL);
+                    break;
+            }
+        }
+    }
+
+    private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
+        FeedId sourceFeedId = endFeedMessage.getSourceFeedId();
+        SubscribableFeedRuntimeId subscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+                FeedRuntimeType.INTAKE, partition);
+        ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
+                .getSubscribableRuntime(subscribableRuntimeId);
+        IAdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
+        adapterRuntimeManager.stop();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Stopped Adapter " + adapterRuntimeManager);
+        }
+    }
+
+    private void hanldeDisconnectFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Ending feed:" + endFeedMessage.getFeedConnectionId());
+        }
+        FeedRuntimeId runtimeId = null;
+        FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
+        if (endFeedMessage.isCompleteDisconnection()) {
+            // subscribableRuntimeType represents the location at which the feed connection receives data
+            FeedRuntimeType runtimeType = null;
+            switch (subscribableRuntimeType) {
+                case INTAKE:
+                    runtimeType = FeedRuntimeType.COLLECT;
+                    break;
+                case COMPUTE:
+                    runtimeType = FeedRuntimeType.COMPUTE_COLLECT;
+                    break;
+                default:
+                    throw new IllegalStateException("Invalid subscribable runtime type " + subscribableRuntimeType);
+            }
+
+            runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
+            CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
+                    .getFeedRuntime(connectionId, runtimeId);
+            feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
+            }
+        } else {
+            // subscribaleRuntimeType represents the location for data hand-off in presence of subscribers
+            switch (subscribableRuntimeType) {
+                case INTAKE:
+                    // illegal state as data hand-off from one feed to another does not happen at intake
+                    throw new IllegalStateException("Illegal State, invalid runtime type  " + subscribableRuntimeType);
+                case COMPUTE:
+                    // feed could be primary or secondary, doesn't matter
+                    SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(
+                            connectionId.getFeedId(), FeedRuntimeType.COMPUTE, partition);
+                    ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
+                            .getSubscribableRuntime(feedSubscribableRuntimeId);
+                    DistributeFeedFrameWriter dWriter = feedRuntime.getFeedFrameWriter();
+                    Map<IFrameWriter, FeedFrameCollector> registeredCollectors = dWriter.getRegisteredReaders();
+
+                    IFrameWriter unsubscribingWriter = null;
+                    for (Entry<IFrameWriter, FeedFrameCollector> entry : registeredCollectors.entrySet()) {
+                        IFrameWriter frameWriter = entry.getKey();
+                        FeedRuntimeInputHandler feedFrameWriter = (FeedRuntimeInputHandler) frameWriter;
+                        if (feedFrameWriter.getConnectionId().equals(endFeedMessage.getFeedConnectionId())) {
+                            unsubscribingWriter = feedFrameWriter;
+                            dWriter.unsubscribeFeed(unsubscribingWriter);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Partial Unsubscription of " + unsubscribingWriter);
+                            }
+                            break;
+                        }
+                    }
+                    break;
+                default:
+                    break;
+            }
+
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Unsubscribed from feed :" + connectionId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
new file mode 100644
index 0000000..80a54be
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
+import org.apache.asterix.external.feed.runtime.FeedRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.external.feed.runtime.SubscribableRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IActivity;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+/*
+ * This IFrameWriter doesn't follow the contract
+ */
+public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMetaComputeNodePushable.class.getName());
+
+    /** Runtime node pushable corresponding to the core feed operator **/
+    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+    /**
+     * A policy enforcer that ensures dynamic decisions for a feed are taken
+     * in accordance with the associated ingestion policy
+     **/
+    private FeedPolicyEnforcer policyEnforcer;
+
+    /**
+     * The Feed Runtime instance associated with the operator. Feed Runtime
+     * captures the state of the operator while the feed is active.
+     */
+    private FeedRuntime feedRuntime;
+
+    /**
+     * A unique identifier for the feed instance. A feed instance represents
+     * the flow of data from a feed to a dataset.
+     **/
+    private FeedConnectionId connectionId;
+
+    /**
+     * Denotes the i'th operator instance in a setting where K operator
+     * instances are scheduled to run in parallel
+     **/
+    private int partition;
+
+    private int nPartitions;
+
+    /** The (singleton) instance of IFeedManager **/
+    private IFeedManager feedManager;
+
+    private FrameTupleAccessor fta;
+
+    private final IHyracksTaskContext ctx;
+
+    private final FeedRuntimeType runtimeType = FeedRuntimeType.COMPUTE;
+
+    private FeedRuntimeInputHandler inputSideHandler;
+
+    public FeedMetaComputeNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
+            int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+            Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+        this.ctx = ctx;
+        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+                .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+        this.connectionId = feedConnectionId;
+        this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getFeedManager();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        FeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(connectionId.getFeedId(), runtimeType, partition);
+        try {
+            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+            if (feedRuntime == null) {
+                initializeNewFeedRuntime(runtimeId);
+            } else {
+                reviveOldFeedRuntime(runtimeId);
+            }
+            writer.open();
+            coreOperator.open();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        this.fta = new FrameTupleAccessor(recordDesc);
+        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
+                policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager, nPartitions);
+
+        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
+                writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+        coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
+
+        feedRuntime = new SubscribableRuntime(connectionId.getFeedId(), runtimeId, inputSideHandler, distributeWriter,
+                recordDesc);
+        feedManager.getFeedSubscriptionManager().registerFeedSubscribableRuntime((ISubscribableRuntime) feedRuntime);
+        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
+
+        distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
+    }
+
+    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        this.fta = new FrameTupleAccessor(recordDesc);
+        this.inputSideHandler = feedRuntime.getInputHandler();
+        this.inputSideHandler.setCoreOperator(coreOperator);
+
+        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
+                writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+        coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
+        distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
+
+        inputSideHandler.reset(nPartitions);
+        feedRuntime.setMode(Mode.PROCESS);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        try {
+            inputSideHandler.nextFrame(buffer);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("Core Op:" + coreOperator.getDisplayName() + " fail ");
+        }
+        feedRuntime.setMode(Mode.FAIL);
+        coreOperator.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
+        boolean end = inputSideHandler.getMode().equals(Mode.END);
+        try {
+            if (inputSideHandler != null) {
+                if (!(stalled || end)) {
+                    inputSideHandler.nextFrame(null); // signal end of data
+                    while (!inputSideHandler.isFinished()) {
+                        synchronized (coreOperator) {
+                            coreOperator.wait();
+                        }
+                    }
+                } else {
+                    inputSideHandler.setFinished(true);
+                }
+            }
+            coreOperator.close();
+            System.out.println("CLOSED " + coreOperator + " STALLED ?" + stalled + " ENDED " + end);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (!stalled) {
+                deregister();
+                System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
+            } else {
+                System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
+            }
+            if (inputSideHandler != null) {
+                inputSideHandler.close();
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
+            }
+        }
+    }
+
+    private void deregister() {
+        if (feedRuntime != null) {
+            // deregister from subscription manager
+            SubscribableFeedRuntimeId runtimeId = (SubscribableFeedRuntimeId) feedRuntime.getRuntimeId();
+            feedManager.getFeedSubscriptionManager().deregisterFeedSubscribableRuntime(runtimeId);
+
+            // deregister from connection manager
+            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
new file mode 100644
index 0000000..4dae72d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
+import org.apache.asterix.external.feed.runtime.FeedRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IActivity;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMetaNodePushable.class.getName());
+
+    /** Runtime node pushable corresponding to the core feed operator **/
+    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+    /**
+     * A policy enforcer that ensures dyanmic decisions for a feed are taken
+     * in accordance with the associated ingestion policy
+     **/
+    private FeedPolicyEnforcer policyEnforcer;
+
+    /**
+     * The Feed Runtime instance associated with the operator. Feed Runtime
+     * captures the state of the operator while the feed is active.
+     */
+    private FeedRuntime feedRuntime;
+
+    /**
+     * A unique identifier for the feed instance. A feed instance represents
+     * the flow of data from a feed to a dataset.
+     **/
+    private FeedConnectionId connectionId;
+
+    /**
+     * Denotes the i'th operator instance in a setting where K operator
+     * instances are scheduled to run in parallel
+     **/
+    private int partition;
+
+    /** Total number of partitions available **/
+    private int nPartitions;
+
+    /** Type associated with the core feed operator **/
+    private final FeedRuntimeType runtimeType = FeedRuntimeType.OTHER;
+
+    /** The (singleton) instance of IFeedManager **/
+    private IFeedManager feedManager;
+
+    private FrameTupleAccessor fta;
+
+    private final IHyracksTaskContext ctx;
+
+    private final String operandId;
+
+    /** The pre-processor associated with this runtime **/
+    private FeedRuntimeInputHandler inputSideHandler;
+
+    public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
+            int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+            Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+        this.ctx = ctx;
+        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+                .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+        this.connectionId = feedConnectionId;
+        this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getFeedManager();
+        this.operandId = operationId;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+        try {
+            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+            if (feedRuntime == null) {
+                initializeNewFeedRuntime(runtimeId);
+            } else {
+                reviveOldFeedRuntime(runtimeId);
+            }
+            coreOperator.open();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        this.fta = new FrameTupleAccessor(recordDesc);
+        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
+                policyEnforcer.getFeedPolicyAccessor(), false, fta, recordDesc, feedManager, nPartitions);
+
+        setupBasicRuntime(inputSideHandler);
+    }
+
+    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        this.inputSideHandler = feedRuntime.getInputHandler();
+        this.fta = new FrameTupleAccessor(recordDesc);
+        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+        feedRuntime.setMode(Mode.PROCESS);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Retreived state from the zombie instance " + runtimeType + " node.");
+        }
+    }
+
+    private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
+        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+        feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        try {
+            inputSideHandler.nextFrame(buffer);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
+        }
+        feedRuntime.setMode(Mode.FAIL);
+        coreOperator.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            coreOperator.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+            // ignore
+        } finally {
+            if (inputSideHandler != null) {
+                inputSideHandler.close();
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
+            }
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
new file mode 100644
index 0000000..9eb6c78
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.Map;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * FeedMetaOperatorDescriptor is a wrapper operator that provides a sanboox like
+ * environment for an hyracks operator that is part of a feed ingestion
+ * pipeline. The MetaFeed operator provides an interface iden- tical to that
+ * offered by the underlying wrapped operator, hereafter referred to as the core
+ * operator. As seen by Hyracks, the altered pipeline is identical to the
+ * earlier version formed from core operators. The MetaFeed operator enhances
+ * each core operator by providing functionality for handling runtime
+ * exceptions, saving any state for future retrieval, and measuring/reporting of
+ * performance characteristics. We next describe how the added functionality
+ * contributes to providing fault- tolerance.
+ */
+
+public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * The actual (Hyracks) operator that is wrapped around by the MetaFeed
+     * operator.
+     **/
+    private IOperatorDescriptor coreOperator;
+
+    /**
+     * A unique identifier for the feed instance. A feed instance represents the
+     * flow of data from a feed to a dataset.
+     **/
+    private final FeedConnectionId feedConnectionId;
+
+    /**
+     * The policy associated with the feed instance.
+     **/
+    private final Map<String, String> feedPolicyProperties;
+
+    /**
+     * type for the feed runtime associated with the operator.
+     * Possible values: COMPUTE, STORE, OTHER
+     **/
+    private final FeedRuntimeType runtimeType;
+
+    private final String operandId;
+
+    public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
+            IOperatorDescriptor coreOperatorDescriptor, Map<String, String> feedPolicyProperties,
+            FeedRuntimeType runtimeType, boolean enableSubscriptionMode, String operandId) {
+        super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
+        this.feedConnectionId = feedConnectionId;
+        this.feedPolicyProperties = feedPolicyProperties;
+        if (coreOperatorDescriptor.getOutputRecordDescriptors().length == 1) {
+            recordDescriptors[0] = coreOperatorDescriptor.getOutputRecordDescriptors()[0];
+        }
+        this.coreOperator = coreOperatorDescriptor;
+        this.runtimeType = runtimeType;
+        this.operandId = operandId;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        IOperatorNodePushable nodePushable = null;
+        switch (runtimeType) {
+            case COMPUTE:
+                nodePushable = new FeedMetaComputeNodePushable(ctx, recordDescProvider, partition, nPartitions,
+                        coreOperator, feedConnectionId, feedPolicyProperties, operandId);
+                break;
+            case STORE:
+                nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions,
+                        coreOperator, feedConnectionId, feedPolicyProperties, operandId);
+                break;
+            case OTHER:
+                nodePushable = new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
+                        feedConnectionId, feedPolicyProperties, operandId);
+                break;
+            case ETS:
+                nodePushable = ((AlgebricksMetaOperatorDescriptor) coreOperator).createPushRuntime(ctx,
+                        recordDescProvider, partition, nPartitions);
+                break;
+            case JOIN:
+                break;
+            default:
+                throw new HyracksDataException(new IllegalArgumentException("Invalid feed runtime: " + runtimeType));
+        }
+        return nodePushable;
+    }
+
+    @Override
+    public String toString() {
+        return "FeedMeta [" + coreOperator + " ]";
+    }
+
+    public IOperatorDescriptor getCoreOperator() {
+        return coreOperator;
+    }
+
+    public FeedRuntimeType getRuntimeType() {
+        return runtimeType;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
new file mode 100644
index 0000000..f75b3eb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
+import org.apache.asterix.external.feed.runtime.FeedRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IActivity;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMetaStoreNodePushable.class.getName());
+
+    /** Runtime node pushable corresponding to the core feed operator **/
+    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
+
+    /**
+     * A policy enforcer that ensures dyanmic decisions for a feed are taken
+     * in accordance with the associated ingestion policy
+     **/
+    private FeedPolicyEnforcer policyEnforcer;
+
+    /**
+     * The Feed Runtime instance associated with the operator. Feed Runtime
+     * captures the state of the operator while the feed is active.
+     */
+    private FeedRuntime feedRuntime;
+
+    /**
+     * A unique identifier for the feed instance. A feed instance represents
+     * the flow of data from a feed to a dataset.
+     **/
+    private FeedConnectionId connectionId;
+
+    /**
+     * Denotes the i'th operator instance in a setting where K operator
+     * instances are scheduled to run in parallel
+     **/
+    private int partition;
+
+    private int nPartitions;
+
+    /** Type associated with the core feed operator **/
+    private final FeedRuntimeType runtimeType = FeedRuntimeType.STORE;
+
+    /** The (singleton) instance of IFeedManager **/
+    private IFeedManager feedManager;
+
+    private FrameTupleAccessor fta;
+
+    private final IHyracksTaskContext ctx;
+
+    private final String operandId;
+
+    private FeedRuntimeInputHandler inputSideHandler;
+
+    public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
+            int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+            Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
+        this.ctx = ctx;
+        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
+                .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+        this.connectionId = feedConnectionId;
+        this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getFeedManager();
+        this.operandId = operationId;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+        try {
+            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
+            if (feedRuntime == null) {
+                initializeNewFeedRuntime(runtimeId);
+            } else {
+                reviveOldFeedRuntime(runtimeId);
+            }
+
+            coreOperator.open();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("Runtime not found for  " + runtimeId + " connection id " + connectionId);
+        }
+        this.fta = new FrameTupleAccessor(recordDesc);
+        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
+                policyEnforcer.getFeedPolicyAccessor(), policyEnforcer.getFeedPolicyAccessor().bufferingEnabled(), fta,
+                recordDesc, feedManager, nPartitions);
+        if (coreOperator instanceof AsterixLSMInsertDeleteOperatorNodePushable) {
+            AsterixLSMInsertDeleteOperatorNodePushable indexOp = (AsterixLSMInsertDeleteOperatorNodePushable) coreOperator;
+            if (!indexOp.isPrimary()) {
+                inputSideHandler.setBufferingEnabled(false);
+            }
+        }
+        setupBasicRuntime(inputSideHandler);
+    }
+
+    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
+        this.inputSideHandler = feedRuntime.getInputHandler();
+        this.fta = new FrameTupleAccessor(recordDesc);
+        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+        this.inputSideHandler.reset(nPartitions);
+        this.inputSideHandler.setCoreOperator(coreOperator);
+        feedRuntime.setMode(Mode.PROCESS);
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning(
+                    "Retreived state from the zombie instance from previous execution for " + runtimeType + " node.");
+        }
+    }
+
+    private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
+        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
+        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
+        feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
+        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        try {
+            inputSideHandler.nextFrame(buffer);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
+        }
+        feedRuntime.setMode(Mode.FAIL);
+        coreOperator.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        System.out.println("CLOSE CALLED FOR " + this.feedRuntime.getRuntimeId());
+        boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
+        try {
+            if (!stalled) {
+                System.out.println("SIGNALLING END OF DATA for " + this.feedRuntime.getRuntimeId() + " mode is "
+                        + inputSideHandler.getMode() + " WAITING ON " + coreOperator);
+                inputSideHandler.nextFrame(null); // signal end of data
+                while (!inputSideHandler.isFinished()) {
+                    synchronized (coreOperator) {
+                        coreOperator.wait();
+                    }
+                }
+                System.out.println("ABOUT TO CLOSE OPERATOR  " + coreOperator);
+            }
+            coreOperator.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+            // ignore
+        } finally {
+            if (!stalled) {
+                deregister();
+                System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
+            } else {
+                System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
+            }
+            inputSideHandler.close();
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
+            }
+        }
+    }
+
+    private void deregister() {
+        if (feedRuntime != null) {
+            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 860d35f..129b62f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.parser;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
@@ -1145,4 +1146,10 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars
         recordBuilderPool.reset();
         abvsBuilderPool.reset();
     }
+
+    @Override
+    public boolean reset(InputStream in) throws IOException {
+        admLexer.reInit(new InputStreamReader(in));
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 146064a..6c399c3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -198,11 +198,17 @@ public class DelimitedDataParser extends AbstractDataParser implements IStreamDa
     }
 
     @Override
-    public void setInputStream(InputStream in) throws Exception {
+    public void setInputStream(InputStream in) throws IOException {
         cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
         if (in != null && hasHeader) {
             cursor.nextRecord();
             while (cursor.nextField());
         }
     }
+
+    @Override
+    public boolean reset(InputStream in) throws IOException {
+        cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 649ca43..c5b39df 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -30,10 +30,6 @@ import org.apache.asterix.external.api.IIndexingAdapterFactory;
 import org.apache.asterix.external.dataset.adapter.GenericAdapter;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.library.ExternalLibraryManager;
-import org.apache.asterix.external.runtime.GenericSocketFeedAdapter;
-import org.apache.asterix.external.runtime.GenericSocketFeedAdapterFactory;
-import org.apache.asterix.external.runtime.SocketClientAdapter;
-import org.apache.asterix.external.runtime.SocketClientAdapterFactory;
 import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.types.ARecordType;
@@ -47,16 +43,16 @@ public class AdapterFactoryProvider {
         Map<String, Class<? extends IAdapterFactory>> adapterFactories = new HashMap<String, Class<? extends IAdapterFactory>>();
         // Class names
         adapterFactories.put(GenericAdapter.class.getName(), GenericAdapterFactory.class);
-        adapterFactories.put(GenericSocketFeedAdapter.class.getName(), GenericSocketFeedAdapterFactory.class);
-        adapterFactories.put(SocketClientAdapter.class.getName(), SocketClientAdapterFactory.class);
-
         // Aliases
         adapterFactories.put(ExternalDataConstants.ALIAS_GENERIC_ADAPTER, GenericAdapterFactory.class);
         adapterFactories.put(ExternalDataConstants.ALIAS_HDFS_ADAPTER, GenericAdapterFactory.class);
         adapterFactories.put(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, GenericAdapterFactory.class);
-        adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_ADAPTER, GenericSocketFeedAdapterFactory.class);
-        adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER, SocketClientAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_ADAPTER, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER, GenericAdapterFactory.class);
         adapterFactories.put(ExternalDataConstants.ALIAS_FILE_FEED_ADAPTER, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_TWITTER_PULL_ADAPTER, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_TWITTER_PUSH_ADAPTER, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_LOCALFS_PUSH_ADAPTER, GenericAdapterFactory.class);
 
         // Compatability
         adapterFactories.put(ExternalDataConstants.ADAPTER_HDFS_CLASSNAME, GenericAdapterFactory.class);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 68a3942..dfe7aed 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -28,14 +28,19 @@ import org.apache.asterix.external.api.IInputStreamProvider;
 import org.apache.asterix.external.api.IInputStreamProviderFactory;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.api.IRecordFlowController;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.api.IStreamDataParser;
 import org.apache.asterix.external.api.IStreamDataParserFactory;
+import org.apache.asterix.external.api.IStreamFlowController;
+import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
+import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
 import org.apache.asterix.external.dataflow.IndexingDataFlowController;
 import org.apache.asterix.external.dataflow.RecordDataFlowController;
 import org.apache.asterix.external.dataflow.StreamDataFlowController;
 import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
@@ -60,9 +65,11 @@ public class DataflowControllerProvider {
             Map<String, String> configuration, boolean indexingOp) throws Exception {
         switch (dataSourceFactory.getDataSourceType()) {
             case RECORDS:
-                RecordDataFlowController recordDataFlowController;
+                IRecordFlowController recordDataFlowController = null;
                 if (indexingOp) {
                     recordDataFlowController = new IndexingDataFlowController();
+                } else if (ExternalDataUtils.isFeed(configuration)) {
+                    recordDataFlowController = new FeedRecordDataFlowController();
                 } else {
                     recordDataFlowController = new RecordDataFlowController();
                 }
@@ -77,7 +84,12 @@ public class DataflowControllerProvider {
                 recordDataFlowController.setRecordParser(dataParser);
                 return recordDataFlowController;
             case STREAM:
-                StreamDataFlowController streamDataFlowController = new StreamDataFlowController();
+                IStreamFlowController streamDataFlowController = null;
+                if (ExternalDataUtils.isFeed(configuration)) {
+                    streamDataFlowController = new FeedStreamDataFlowController();
+                } else {
+                    streamDataFlowController = new StreamDataFlowController();
+                }
                 streamDataFlowController.configure(configuration, ctx);
                 streamDataFlowController.setTupleForwarder(DataflowUtils.getTupleForwarder(configuration));
                 IInputStreamProviderFactory streamProviderFactory = (IInputStreamProviderFactory) dataSourceFactory;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index c69e12c..a7ab062 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -27,6 +27,7 @@ import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
 import org.apache.asterix.external.input.record.reader.factory.LineRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.factory.SemiStructuredRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.factory.TwitterRecordReaderFactory;
 import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
 import org.apache.asterix.external.input.stream.factory.SocketInputStreamProviderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -92,8 +93,12 @@ public class DatasourceFactoryProvider {
                             .setInputStreamFactoryProvider(DatasourceFactoryProvider.getInputStreamFactory(
                                     ExternalDataUtils.getRecordReaderStreamName(configuration), configuration));;
                     break;
+                case ExternalDataConstants.READER_TWITTER_PULL:
+                case ExternalDataConstants.READER_TWITTER_PUSH:
+                    readerFactory = new TwitterRecordReaderFactory();
+                    break;
                 default:
-                    throw new AsterixException("unknown input stream factory");
+                    throw new AsterixException("unknown record reader factory");
             }
         }
         return readerFactory;