You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:07 UTC

[13/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
new file mode 100644
index 0000000..0749f82
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class RemoteSocketMessageListener {
+
+    private static final Logger LOGGER = Logger.getLogger(RemoteSocketMessageListener.class.getName());
+
+    private final String host;
+    private final int port;
+    private final LinkedBlockingQueue<String> outbox;
+    private final ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+    private RemoteMessageListenerServer listenerServer;
+
+    public RemoteSocketMessageListener(String host, int port, LinkedBlockingQueue<String> outbox) {
+        this.host = host;
+        this.port = port;
+        this.outbox = outbox;
+    }
+
+    public void stop() {
+        if (!executorService.isShutdown()) {
+            executorService.shutdownNow();
+        }
+        listenerServer.stop();
+
+    }
+
+    public void start() throws IOException {
+        listenerServer = new RemoteMessageListenerServer(host, port, outbox);
+        executorService.execute(listenerServer);
+    }
+
+    private static class RemoteMessageListenerServer implements Runnable {
+
+        private final String host;
+        private final int port;
+        private final LinkedBlockingQueue<String> outbox;
+        private Socket client;
+
+        public RemoteMessageListenerServer(String host, int port, LinkedBlockingQueue<String> outbox) {
+            this.host = host;
+            this.port = port;
+            this.outbox = outbox;
+        }
+
+        public void stop() {
+            try {
+                client.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+
+        @Override
+        public void run() {
+            char EOL = (char) "\n".getBytes()[0];
+            Socket client = null;
+            try {
+                client = new Socket(host, port);
+                InputStream in = client.getInputStream();
+                CharBuffer buffer = CharBuffer.allocate(5000);
+                char ch;
+                while (true) {
+                    ch = (char) in.read();
+                    if ((ch) == -1) {
+                        break;
+                    }
+                    while (ch != EOL) {
+                        buffer.put(ch);
+                        ch = (char) in.read();
+                    }
+                    buffer.flip();
+                    String s = new String(buffer.array());
+                    synchronized (outbox) {
+                        outbox.add(s + "\n");
+                    }
+                    buffer.position(0);
+                    buffer.limit(5000);
+                }
+
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Unable to start Remote Message listener" + client);
+                }
+            } finally {
+                if (client != null && !client.isClosed()) {
+                    try {
+                        client.close();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+    }
+
+    public static interface IMessageAnalyzer {
+
+        /**
+         * @return
+         */
+        public LinkedBlockingQueue<String> getMessageQueue();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ScaleInReportMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ScaleInReportMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ScaleInReportMessage.java
new file mode 100644
index 0000000..62aba04
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ScaleInReportMessage.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.util.FeedConstants;
+
+/**
+ * A feed control message indicating the need to scale in a stage of the feed ingestion pipeline.
+ * Currently, scaling-in of the compute stage is supported.
+ **/
+public class ScaleInReportMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+
+    private final FeedRuntimeType runtimeType;
+
+    private int currentCardinality;
+
+    private int reducedCardinaliy;
+
+    public ScaleInReportMessage(FeedConnectionId connectionId, FeedRuntimeType runtimeType, int currentCardinality,
+            int reducedCardinaliy) {
+        super(MessageType.SCALE_IN_REQUEST);
+        this.connectionId = connectionId;
+        this.runtimeType = runtimeType;
+        this.currentCardinality = currentCardinality;
+        this.reducedCardinaliy = reducedCardinaliy;
+    }
+
+    @Override
+    public String toString() {
+        return MessageType.SCALE_IN_REQUEST.name() + "  " + connectionId + " [" + runtimeType + "] "
+                + " currentCardinality " + currentCardinality + " reducedCardinality " + reducedCardinaliy;
+    }
+
+    public FeedRuntimeType getRuntimeType() {
+        return runtimeType;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeType);
+        obj.put(FeedConstants.MessageConstants.CURRENT_CARDINALITY, currentCardinality);
+        obj.put(FeedConstants.MessageConstants.REDUCED_CARDINALITY, reducedCardinaliy);
+        return obj;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public static ScaleInReportMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        FeedRuntimeType runtimeType = FeedRuntimeType.valueOf(obj
+                .getString(FeedConstants.MessageConstants.RUNTIME_TYPE));
+        return new ScaleInReportMessage(connectionId, runtimeType,
+                obj.getInt(FeedConstants.MessageConstants.CURRENT_CARDINALITY),
+                obj.getInt(FeedConstants.MessageConstants.REDUCED_CARDINALITY));
+    }
+
+    public void reset(int currentCardinality, int reducedCardinaliy) {
+        this.currentCardinality = currentCardinality;
+        this.reducedCardinaliy = reducedCardinaliy;
+    }
+
+    public int getCurrentCardinality() {
+        return currentCardinality;
+    }
+
+    public void setCurrentCardinality(int currentCardinality) {
+        this.currentCardinality = currentCardinality;
+    }
+
+    public int getReducedCardinaliy() {
+        return reducedCardinaliy;
+    }
+
+    public void setReducedCardinaliy(int reducedCardinaliy) {
+        this.reducedCardinaliy = reducedCardinaliy;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/SocketMessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/SocketMessageListener.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/SocketMessageListener.java
new file mode 100644
index 0000000..a1a0ad5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/SocketMessageListener.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IMessageReceiver;
+
+/**
+ * Listens for messages at a configured port and redirects them to a
+ * an instance of {@code IMessageReceiver}.
+ * Messages may arrive in parallel from multiple senders. Each sender is handled by
+ * a respective instance of {@code ClientHandler}.
+ */
+public class SocketMessageListener {
+
+    private static final Logger LOGGER = Logger.getLogger(SocketMessageListener.class.getName());
+
+    private final IMessageReceiver<String> messageReceiver;
+    private final MessageListenerServer listenerServer;
+
+    private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+    public SocketMessageListener(int port, IMessageReceiver<String> messageReceiver) {
+        this.messageReceiver = messageReceiver;
+        this.listenerServer = new MessageListenerServer(port, messageReceiver);
+    }
+
+    public void stop() throws IOException {
+        listenerServer.stop();
+        messageReceiver.close(false);
+        if (!executorService.isShutdown()) {
+            executorService.shutdownNow();
+        }
+    }
+
+    public void start() {
+        messageReceiver.start();
+        executorService.execute(listenerServer);
+    }
+
+    private static class MessageListenerServer implements Runnable {
+
+        private final int port;
+        private final IMessageReceiver<String> messageReceiver;
+        private ServerSocket server;
+        private final Executor executor;
+
+        public MessageListenerServer(int port, IMessageReceiver<String> messageReceiver) {
+            this.port = port;
+            this.messageReceiver = messageReceiver;
+            this.executor = Executors.newCachedThreadPool();
+        }
+
+        public void stop() throws IOException {
+            server.close();
+        }
+
+        @Override
+        public void run() {
+            Socket client = null;
+            try {
+                server = new ServerSocket(port);
+                while (true) {
+                    client = server.accept();
+                    ClientHandler handler = new ClientHandler(client, messageReceiver);
+                    executor.execute(handler);
+                }
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Unable to start Message listener" + server);
+                }
+            } finally {
+                if (server != null) {
+                    try {
+                        server.close();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+
+        private static class ClientHandler implements Runnable {
+
+            private static final char EOL = (char) "\n".getBytes()[0];
+
+            private final Socket client;
+            private final IMessageReceiver<String> messageReceiver;
+
+            public ClientHandler(Socket client, IMessageReceiver<String> messageReceiver) {
+                this.client = client;
+                this.messageReceiver = messageReceiver;
+            }
+
+            @Override
+            public void run() {
+                try {
+                    InputStream in = client.getInputStream();
+                    CharBuffer buffer = CharBuffer.allocate(5000);
+                    char ch;
+                    while (true) {
+                        ch = (char) in.read();
+                        if ((ch) == -1) {
+                            break;
+                        }
+                        while (ch != EOL) {
+                            buffer.put(ch);
+                            ch = (char) in.read();
+                        }
+                        buffer.flip();
+                        String s = new String(buffer.array(), 0, buffer.limit());
+                        messageReceiver.sendMessage(s + "\n");
+                        buffer.position(0);
+                        buffer.limit(5000);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Unable to process mesages from client" + client);
+                    }
+                } finally {
+                    if (client != null) {
+                        try {
+                            client.close();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            }
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/StorageReportFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/StorageReportFeedMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/StorageReportFeedMessage.java
new file mode 100644
index 0000000..68ce74d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/StorageReportFeedMessage.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.external.util.FeedConstants.MessageConstants;
+
+/**
+ * A feed control message sent from a storage runtime of a feed pipeline to report the intake timestamp corresponding
+ * to the last persisted tuple.
+ */
+public class StorageReportFeedMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+    private final int partition;
+    private long lastPersistedTupleIntakeTimestamp;
+    private boolean persistenceDelayWithinLimit;
+    private long averageDelay;
+    private int intakePartition;
+
+    public StorageReportFeedMessage(FeedConnectionId connectionId, int partition,
+            long lastPersistedTupleIntakeTimestamp, boolean persistenceDelayWithinLimit, long averageDelay,
+            int intakePartition) {
+        super(MessageType.STORAGE_REPORT);
+        this.connectionId = connectionId;
+        this.partition = partition;
+        this.lastPersistedTupleIntakeTimestamp = lastPersistedTupleIntakeTimestamp;
+        this.persistenceDelayWithinLimit = persistenceDelayWithinLimit;
+        this.averageDelay = averageDelay;
+        this.intakePartition = intakePartition;
+    }
+
+    @Override
+    public String toString() {
+        return messageType.name() + " " + connectionId + " [" + lastPersistedTupleIntakeTimestamp + "] ";
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public long getLastPersistedTupleIntakeTimestamp() {
+        return lastPersistedTupleIntakeTimestamp;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public boolean isPersistenceDelayWithinLimit() {
+        return persistenceDelayWithinLimit;
+    }
+
+    public void setPersistenceDelayWithinLimit(boolean persistenceDelayWithinLimit) {
+        this.persistenceDelayWithinLimit = persistenceDelayWithinLimit;
+    }
+
+    public long getAverageDelay() {
+        return averageDelay;
+    }
+
+    public void setAverageDelay(long averageDelay) {
+        this.averageDelay = averageDelay;
+    }
+
+    public int getIntakePartition() {
+        return intakePartition;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP, lastPersistedTupleIntakeTimestamp);
+        obj.put(MessageConstants.PERSISTENCE_DELAY_WITHIN_LIMIT, persistenceDelayWithinLimit);
+        obj.put(MessageConstants.AVERAGE_PERSISTENCE_DELAY, averageDelay);
+        obj.put(FeedConstants.MessageConstants.PARTITION, partition);
+        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+
+        return obj;
+    }
+
+    public static StorageReportFeedMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        int partition = obj.getInt(FeedConstants.MessageConstants.PARTITION);
+        long timestamp = obj.getLong(FeedConstants.MessageConstants.LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP);
+        boolean persistenceDelayWithinLimit = obj.getBoolean(MessageConstants.PERSISTENCE_DELAY_WITHIN_LIMIT);
+        long averageDelay = obj.getLong(MessageConstants.AVERAGE_PERSISTENCE_DELAY);
+        int intakePartition = obj.getInt(MessageConstants.INTAKE_PARTITION);
+        return new StorageReportFeedMessage(connectionId, partition, timestamp, persistenceDelayWithinLimit,
+                averageDelay, intakePartition);
+    }
+
+    public void reset(long lastPersistedTupleIntakeTimestamp, boolean delayWithinLimit, long averageDelay) {
+        this.lastPersistedTupleIntakeTimestamp = lastPersistedTupleIntakeTimestamp;
+        this.persistenceDelayWithinLimit = delayWithinLimit;
+        this.averageDelay = averageDelay;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/TerminateDataFlowMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/TerminateDataFlowMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/TerminateDataFlowMessage.java
new file mode 100644
index 0000000..ab77840
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/TerminateDataFlowMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.feed.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.util.FeedConstants;
+
+public class TerminateDataFlowMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+
+    public TerminateDataFlowMessage(FeedConnectionId connectionId) {
+        super(MessageType.TERMINATE_FLOW);
+        this.connectionId = connectionId;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        return obj;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ThrottlingEnabledFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ThrottlingEnabledFeedMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ThrottlingEnabledFeedMessage.java
new file mode 100644
index 0000000..0459310
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/ThrottlingEnabledFeedMessage.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedConstants;
+
+/**
+ * A feed control message indicating the need to end the feed. This message is dispatched
+ * to all locations that host an operator involved in the feed pipeline.
+ */
+public class ThrottlingEnabledFeedMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+
+    private final FeedRuntimeId runtimeId;
+
+    public ThrottlingEnabledFeedMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId) {
+        super(MessageType.THROTTLING_ENABLED);
+        this.connectionId = connectionId;
+        this.runtimeId = runtimeId;
+    }
+
+    @Override
+    public String toString() {
+        return MessageType.END.name() + "  " + connectionId + " [" + runtimeId + "] ";
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
+        obj.put(FeedConstants.MessageConstants.OPERAND_ID, runtimeId.getOperandId());
+        obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
+        return obj;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public FeedRuntimeId getFeedRuntimeId() {
+        return runtimeId;
+    }
+
+    public static ThrottlingEnabledFeedMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
+                .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
+                obj.getInt(FeedConstants.MessageConstants.PARTITION),
+                obj.getString(FeedConstants.MessageConstants.OPERAND_ID));
+        return new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/XAQLFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/XAQLFeedMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/XAQLFeedMessage.java
new file mode 100644
index 0000000..cef3fa9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/XAQLFeedMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.util.FeedConstants;
+
+/**
+ * A feed control message indicating the need to execute a give AQL.
+ */
+public class XAQLFeedMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String aql;
+    private final FeedConnectionId connectionId;
+
+    public XAQLFeedMessage(FeedConnectionId connectionId, String aql) {
+        super(MessageType.XAQL);
+        this.connectionId = connectionId;
+        this.aql = aql;
+    }
+
+    @Override
+    public String toString() {
+        return messageType.name() + " " + connectionId + " [" + aql + "] ";
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public String getAql() {
+        return aql;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.AQL, aql);
+        return obj;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicy.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicy.java
new file mode 100644
index 0000000..da5907c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.policy;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class FeedPolicy implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final String dataverseName;
+    // Enforced to be unique within a dataverse.
+    private final String policyName;
+    // A description of the policy
+    private final String description;
+    // The policy properties associated with the feed dataset
+    private Map<String, String> properties;
+
+    public FeedPolicy(String dataverseName, String policyName, String description, Map<String, String> properties) {
+        this.dataverseName = dataverseName;
+        this.policyName = policyName;
+        this.description = description;
+        this.properties = properties;
+    }
+
+    public String getDataverseName() {
+        return dataverseName;
+    }
+
+    public String getPolicyName() {
+        return policyName;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof FeedPolicy)) {
+            return false;
+        }
+        FeedPolicy otherPolicy = (FeedPolicy) other;
+        if (!otherPolicy.dataverseName.equals(dataverseName)) {
+            return false;
+        }
+        if (!otherPolicy.policyName.equals(policyName)) {
+            return false;
+        }
+        return true;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
new file mode 100644
index 0000000..077a58d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyAccessor.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.policy;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * A utility class to access the configuration parameters of a feed ingestion policy.
+ */
+public class FeedPolicyAccessor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * --------------------------
+     * failure configuration
+     * --------------------------
+     **/
+
+    /** continue feed ingestion after a soft (runtime) failure **/
+    public static final String SOFT_FAILURE_CONTINUE = "soft.failure.continue";
+
+    /** log failed tuple to an asterixdb dataset for future reference **/
+    public static final String SOFT_FAILURE_LOG_DATA = "soft.failure.log.data";
+
+    /** continue feed ingestion after loss of one or more machines (hardware failure) **/
+    public static final String HARDWARE_FAILURE_CONTINUE = "hardware.failure.continue";
+
+    /** auto-start a loser feed when the asterixdb instance is restarted **/
+    public static final String CLUSTER_REBOOT_AUTO_RESTART = "cluster.reboot.auto.restart";
+
+    /** framework provides guarantee that each received feed record will be processed through the ingestion pipeline at least once **/
+    public static final String AT_LEAST_ONE_SEMANTICS = "atleast.once.semantics";
+
+    /**
+     * --------------------------
+     * flow control configuration
+     * --------------------------
+     **/
+
+    /** enable buffering in feeds **/
+    public static final String BUFFERING_ENABLED = "buffering.enabled";
+
+    /** spill excess tuples to disk if an operator cannot process incoming data at its arrival rate **/
+    public static final String SPILL_TO_DISK_ON_CONGESTION = "spill.to.disk.on.congestion";
+
+    /** the maximum size of data (tuples) that can be spilled to disk **/
+    public static final String MAX_SPILL_SIZE_ON_DISK = "max.spill.size.on.disk";
+
+    /** discard tuples altogether if an operator cannot process incoming data at its arrival rate **/
+    public static final String DISCARD_ON_CONGESTION = "discard.on.congestion";
+
+    /** maximum fraction of ingested data that can be discarded **/
+    public static final String MAX_FRACTION_DISCARD = "max.fraction.discard";
+
+    /** maximum end-to-end delay/latency in persisting a tuple through the feed ingestion pipeline **/
+    public static final String MAX_DELAY_RECORD_PERSISTENCE = "max.delay.record.persistence";
+
+    /** rate limit the inflow of tuples in accordance with the maximum capacity of the pipeline **/
+    public static final String THROTTLING_ENABLED = "throttling.enabled";
+
+    /** elasticity **/
+    public static final String ELASTIC = "elastic";
+
+    /** statistics **/
+    public static final String TIME_TRACKING = "time.tracking";
+
+    /** logging of statistics **/
+    public static final String LOGGING_STATISTICS = "logging.statistics";
+
+    public static final long NO_LIMIT = -1;
+
+    private Map<String, String> feedPolicy;
+
+    public Map<String, String> getFeedPolicy() {
+        return feedPolicy;
+    }
+
+    public FeedPolicyAccessor(Map<String, String> feedPolicy) {
+        this.feedPolicy = feedPolicy;
+    }
+
+    public void reset(Map<String, String> feedPolicy) {
+        this.feedPolicy = feedPolicy;
+    }
+
+    /** Failure recover/reporting **/
+
+    public boolean logDataOnSoftFailure() {
+        return getBooleanPropertyValue(SOFT_FAILURE_LOG_DATA, false);
+    }
+
+    public boolean continueOnSoftFailure() {
+        return getBooleanPropertyValue(SOFT_FAILURE_CONTINUE, false);
+    }
+
+    public boolean continueOnHardwareFailure() {
+        return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE, false);
+    }
+
+    public boolean autoRestartOnClusterReboot() {
+        return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART, false);
+    }
+
+    public boolean atleastOnceSemantics() {
+        return getBooleanPropertyValue(AT_LEAST_ONE_SEMANTICS, false);
+    }
+
+    /** flow control **/
+    public boolean bufferingEnabled() {
+        return getBooleanPropertyValue(BUFFERING_ENABLED, false);
+    }
+
+    public boolean spillToDiskOnCongestion() {
+        return getBooleanPropertyValue(SPILL_TO_DISK_ON_CONGESTION, false);
+    }
+
+    public boolean discardOnCongestion() {
+        return getMaxFractionDiscard() > 0;
+    }
+
+    public boolean throttlingEnabled() {
+        return getBooleanPropertyValue(THROTTLING_ENABLED, false);
+    }
+
+    public long getMaxSpillOnDisk() {
+        return getLongPropertyValue(MAX_SPILL_SIZE_ON_DISK, NO_LIMIT);
+    }
+
+    public float getMaxFractionDiscard() {
+        return getFloatPropertyValue(MAX_FRACTION_DISCARD, 0);
+    }
+
+    public long getMaxDelayRecordPersistence() {
+        return getLongPropertyValue(MAX_DELAY_RECORD_PERSISTENCE, Long.MAX_VALUE);
+    }
+
+    /** Elasticity **/
+    public boolean isElastic() {
+        return getBooleanPropertyValue(ELASTIC, false);
+    }
+
+    /** Statistics **/
+    public boolean isTimeTrackingEnabled() {
+        return getBooleanPropertyValue(TIME_TRACKING, false);
+    }
+
+    /** Logging of statistics **/
+    public boolean isLoggingStatisticsEnabled() {
+        return getBooleanPropertyValue(LOGGING_STATISTICS, false);
+    }
+
+    private boolean getBooleanPropertyValue(String key, boolean defValue) {
+        String v = feedPolicy.get(key);
+        return v == null ? false : Boolean.valueOf(v);
+    }
+
+    private long getLongPropertyValue(String key, long defValue) {
+        String v = feedPolicy.get(key);
+        return v != null ? Long.parseLong(v) : defValue;
+    }
+
+    private float getFloatPropertyValue(String key, float defValue) {
+        String v = feedPolicy.get(key);
+        return v != null ? Float.parseFloat(v) : defValue;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
new file mode 100644
index 0000000..e0944ad
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/policy/FeedPolicyEnforcer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.policy;
+
+import java.rmi.RemoteException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+
+public class FeedPolicyEnforcer {
+
+    private final FeedConnectionId connectionId;
+    private final FeedPolicyAccessor policyAccessor;
+
+    public FeedPolicyEnforcer(FeedConnectionId feedConnectionId, Map<String, String> feedPolicy) {
+        this.connectionId = feedConnectionId;
+        this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
+    }
+
+    public boolean continueIngestionPostSoftwareFailure(Exception e) throws RemoteException, ACIDException {
+        return policyAccessor.continueOnSoftFailure();
+    }
+
+    public FeedPolicyAccessor getFeedPolicyAccessor() {
+        return policyAccessor;
+    }
+
+    public FeedConnectionId getFeedId() {
+        return connectionId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
new file mode 100644
index 0000000..3ac28c9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
+import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * The class in charge of executing feed adapters.
+ */
+public class AdapterExecutor implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(AdapterExecutor.class.getName());
+
+    private final DistributeFeedFrameWriter writer;     // A writer that sends frames to multiple receivers (that can
+                                                        // increase or decrease at any time)
+    private final IFeedAdapter adapter;                 // The adapter
+    private final IAdapterRuntimeManager adapterManager;// The runtime manager <-- two way visibility -->
+
+    public AdapterExecutor(int partition, DistributeFeedFrameWriter writer, IFeedAdapter adapter,
+            IAdapterRuntimeManager adapterManager) {
+        this.writer = writer;
+        this.adapter = adapter;
+        this.adapterManager = adapterManager;
+    }
+
+    @Override
+    public void run() {
+        // Start by getting the partition number from the manager
+        int partition = adapterManager.getPartition();
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Starting ingestion for partition:" + partition);
+        }
+        boolean continueIngestion = true;
+        boolean failedIngestion = false;
+        while (continueIngestion) {
+            try {
+                // Start the adapter
+                adapter.start(partition, writer);
+                // Adapter has completed execution
+                continueIngestion = false;
+            } catch (Exception e) {
+                LOGGER.error("Exception during feed ingestion ", e);
+                // Check if the adapter wants to continue ingestion
+                if (ExternalDataExceptionUtils.isResolvable(e)) {
+                    continueIngestion = adapter.handleException(e);
+                } else {
+                    continueIngestion = false;
+                }
+                failedIngestion = !continueIngestion;
+            }
+        }
+        // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying the
+        // runtime manager
+        adapterManager.setState(failedIngestion ? State.FAILED_INGESTION : State.FINISHED_INGESTION);
+        synchronized (adapterManager) {
+            adapterManager.notifyAll();
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
new file mode 100644
index 0000000..6c3e44d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.log4j.Logger;
+
+/**
+ * This class manages the execution of an adapter within a feed
+ */
+public class AdapterRuntimeManager implements IAdapterRuntimeManager {
+
+    private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
+
+    private final FeedId feedId;                    // (dataverse-feed)
+
+    private final IFeedAdapter feedAdapter;         // The adapter
+
+    private final IIntakeProgressTracker tracker;   // Not used. needs to be fixed soon.
+
+    private final AdapterExecutor adapterExecutor;  // The executor for the adapter <-- two way visibility -->
+
+    private final int partition;                    // The partition number
+
+    private final ExecutorService executorService;  // Executor service to run/shutdown the adapter executor
+
+    private IngestionRuntime ingestionRuntime;      // Runtime representing the ingestion stage of a feed <-- two way
+                                                    // visibility -->
+
+    private State state;                            // One of {ACTIVE_INGESTION, NACTIVE_INGESTION, FINISHED_INGESTION,
+                                                    // FAILED_INGESTION}
+
+    public AdapterRuntimeManager(FeedId feedId, IFeedAdapter feedAdapter, IIntakeProgressTracker tracker,
+            DistributeFeedFrameWriter writer, int partition) {
+        this.feedId = feedId;
+        this.feedAdapter = feedAdapter;
+        this.tracker = tracker;
+        this.partition = partition;
+        this.adapterExecutor = new AdapterExecutor(partition, writer, feedAdapter, this);
+        this.executorService = Executors.newSingleThreadExecutor();
+        this.state = State.INACTIVE_INGESTION;
+    }
+
+    @Override
+    public void start() throws Exception {
+        state = State.ACTIVE_INGESTION;
+        executorService.execute(adapterExecutor);
+    }
+
+    @Override
+    public void stop() {
+        boolean stopped = false;
+        try {
+            stopped = feedAdapter.stop();
+        } catch (Exception exception) {
+            LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
+        } finally {
+            state = State.FINISHED_INGESTION;
+            if (stopped) {
+                // stop() returned true, we wait for the process termination
+                executorService.shutdown();
+                try {
+                    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e);
+                }
+            } else {
+                // stop() returned false, we try to force shutdown
+                executorService.shutdownNow();
+            }
+
+        }
+    }
+
+    @Override
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    @Override
+    public String toString() {
+        return feedId + "[" + partition + "]";
+    }
+
+    @Override
+    public IFeedAdapter getFeedAdapter() {
+        return feedAdapter;
+    }
+
+    public IIntakeProgressTracker getTracker() {
+        return tracker;
+    }
+
+    @Override
+    public synchronized State getState() {
+        return state;
+    }
+
+    @Override
+    public synchronized void setState(State state) {
+        this.state = state;
+    }
+
+    public AdapterExecutor getAdapterExecutor() {
+        return adapterExecutor;
+    }
+
+    @Override
+    public int getPartition() {
+        return partition;
+    }
+
+    public IngestionRuntime getIngestionRuntime() {
+        return ingestionRuntime;
+    }
+
+    @Override
+    public IIntakeProgressTracker getProgressTracker() {
+        return tracker;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
new file mode 100644
index 0000000..967dc3e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import java.util.Map;
+
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.api.ISubscriberRuntime;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+/**
+ * Represents the feed runtime that collects feed tuples from another feed.
+ * In case of a primary feed, the CollectionRuntime collects tuples from the feed
+ * intake job. For a secondary feed, tuples are collected from the intake/compute
+ * runtime associated with the source feed.
+ */
+public class CollectionRuntime extends FeedRuntime implements ISubscriberRuntime {
+
+    private final FeedConnectionId connectionId;            // [Dataverse - Feed - Dataset]
+    private final ISubscribableRuntime sourceRuntime;       // Runtime that provides the data
+    private final Map<String, String> feedPolicy;           // Policy associated with the feed
+    private FeedFrameCollector frameCollector;              // Collector that can be plugged into a frame distributor
+
+    public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            FeedRuntimeInputHandler inputSideHandler, IFrameWriter outputSideWriter, ISubscribableRuntime sourceRuntime,
+            Map<String, String> feedPolicy) {
+        super(runtimeId, inputSideHandler, outputSideWriter);
+        this.connectionId = connectionId;
+        this.sourceRuntime = sourceRuntime;
+        this.feedPolicy = feedPolicy;
+    }
+
+    public State waitTillCollectionOver() throws InterruptedException {
+        if (!(isCollectionOver())) {
+            synchronized (frameCollector) {
+                while (!isCollectionOver()) {
+                    frameCollector.wait();
+                }
+            }
+        }
+        return frameCollector.getState();
+    }
+
+    private boolean isCollectionOver() {
+        return frameCollector.getState().equals(FeedFrameCollector.State.FINISHED)
+                || frameCollector.getState().equals(FeedFrameCollector.State.HANDOVER);
+    }
+
+    @Override
+    public void setMode(Mode mode) {
+        getInputHandler().setMode(mode);
+    }
+
+    @Override
+    public Map<String, String> getFeedPolicy() {
+        return feedPolicy;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public ISubscribableRuntime getSourceRuntime() {
+        return sourceRuntime;
+    }
+
+    public void setFrameCollector(FeedFrameCollector frameCollector) {
+        this.frameCollector = frameCollector;
+    }
+
+    @Override
+    public FeedFrameCollector getFrameCollector() {
+        return frameCollector;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
new file mode 100644
index 0000000..76b1b19
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntime.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
+import org.apache.asterix.external.feed.api.IFeedRuntime;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+public class FeedRuntime implements IFeedRuntime {
+
+    /** A unique identifier for the runtime **/
+    protected final FeedRuntimeId runtimeId;
+
+    /** The output frame writer associated with the runtime **/
+    protected IFrameWriter frameWriter;
+
+    /** The pre-processor associated with the runtime **/
+    protected FeedRuntimeInputHandler inputHandler;
+
+    public FeedRuntime(FeedRuntimeId runtimeId, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter) {
+        this.runtimeId = runtimeId;
+        this.frameWriter = frameWriter;
+        this.inputHandler = inputHandler;
+    }
+
+    public void setFrameWriter(IFeedOperatorOutputSideHandler frameWriter) {
+        this.frameWriter = frameWriter;
+    }
+
+    @Override
+    public FeedRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
+    @Override
+    public IFrameWriter getFeedFrameWriter() {
+        return frameWriter;
+    }
+
+    @Override
+    public String toString() {
+        return runtimeId.toString();
+    }
+
+    @Override
+    public FeedRuntimeInputHandler getInputHandler() {
+        return inputHandler;
+    }
+
+    public Mode getMode() {
+        return inputHandler != null ? inputHandler.getMode() : Mode.PROCESS;
+    }
+
+    public void setMode(Mode mode) {
+        this.inputHandler.setMode(mode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
new file mode 100644
index 0000000..45d8afe
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import java.io.Serializable;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+
+public class FeedRuntimeId implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String DEFAULT_OPERAND_ID = "N/A";
+
+    private final FeedRuntimeType runtimeType;
+    private final int partition;
+    private final String operandId;
+
+    public FeedRuntimeId(FeedRuntimeType runtimeType, int partition, String operandId) {
+        this.runtimeType = runtimeType;
+        this.partition = partition;
+        this.operandId = operandId;
+    }
+
+    @Override
+    public String toString() {
+        return runtimeType + "[" + partition + "]" + "{" + operandId + "}";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof FeedRuntimeId)) {
+            return false;
+        }
+        FeedRuntimeId other = (FeedRuntimeId) o;
+        return (other.getFeedRuntimeType().equals(runtimeType) && other.getOperandId().equals(operandId) && other
+                .getPartition() == partition);
+    }
+
+    @Override
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    public FeedRuntimeType getFeedRuntimeType() {
+        return runtimeType;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public FeedRuntimeType getRuntimeType() {
+        return runtimeType;
+    }
+
+    public String getOperandId() {
+        return operandId;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
new file mode 100644
index 0000000..fd6fcb3
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import java.util.logging.Level;
+
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
+import org.apache.asterix.external.feed.dataflow.FrameDistributor;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class IngestionRuntime extends SubscribableRuntime {
+
+    private final IAdapterRuntimeManager adapterRuntimeManager;
+
+    public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
+            RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager) {
+        super(feedId, runtimeId, null, feedWriter, recordDesc);
+        this.adapterRuntimeManager = adaptorRuntimeManager;
+    }
+
+    @Override
+    public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception {
+        FeedFrameCollector reader = dWriter.subscribeFeed(fpa, collectionRuntime.getInputHandler(),
+                collectionRuntime.getConnectionId());
+        collectionRuntime.setFrameCollector(reader);
+
+        if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
+            adapterRuntimeManager.start();
+        }
+        subscribers.add(collectionRuntime);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this);
+        }
+    }
+
+    @Override
+    public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
+        if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Stopping adapter for " + this + " as no more registered collectors");
+            }
+            adapterRuntimeManager.stop();
+        } else {
+            dWriter.unsubscribeFeed(collectionRuntime.getInputHandler());
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Unsubscribed feed collection [" + collectionRuntime + "] from " + this);
+        }
+        subscribers.remove(collectionRuntime);
+    }
+
+    public void endOfFeed() {
+        dWriter.notifyEndOfFeed();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Notified End Of Feed  [" + this + "]");
+        }
+    }
+
+    public IAdapterRuntimeManager getAdapterRuntimeManager() {
+        return adapterRuntimeManager;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableFeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableFeedRuntimeId.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableFeedRuntimeId.java
new file mode 100644
index 0000000..f6db99c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableFeedRuntimeId.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedId;
+
+public class SubscribableFeedRuntimeId extends FeedRuntimeId {
+    private static final long serialVersionUID = 1L;
+    private final FeedId feedId;
+
+    public SubscribableFeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition) {
+        super(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
+        this.feedId = feedId;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof SubscribableFeedRuntimeId)) {
+            return false;
+        }
+
+        return (super.equals(o) && this.feedId.equals(((SubscribableFeedRuntimeId) o).getFeedId()));
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode() + feedId.hashCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
new file mode 100644
index 0000000..056875c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/SubscribableRuntime.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.runtime;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.api.ISubscriberRuntime;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class SubscribableRuntime extends FeedRuntime implements ISubscribableRuntime {
+
+    protected static final Logger LOGGER = Logger.getLogger(SubscribableRuntime.class.getName());
+
+    protected final FeedId feedId;
+    protected final List<ISubscriberRuntime> subscribers;
+    protected final RecordDescriptor recordDescriptor;
+    protected final DistributeFeedFrameWriter dWriter;
+
+    public SubscribableRuntime(FeedId feedId, FeedRuntimeId runtimeId, FeedRuntimeInputHandler inputHandler,
+            DistributeFeedFrameWriter dWriter, RecordDescriptor recordDescriptor) {
+        super(runtimeId, inputHandler, dWriter);
+        this.feedId = feedId;
+        this.recordDescriptor = recordDescriptor;
+        this.dWriter = dWriter;
+        this.subscribers = new ArrayList<ISubscriberRuntime>();
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    @Override
+    public String toString() {
+        return "SubscribableRuntime" + " [" + feedId + "]" + "(" + runtimeId + ")";
+    }
+
+    @Override
+    public synchronized void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime)
+            throws Exception {
+        FeedFrameCollector collector = dWriter.subscribeFeed(new FeedPolicyAccessor(collectionRuntime.getFeedPolicy()),
+                collectionRuntime.getInputHandler(), collectionRuntime.getConnectionId());
+        collectionRuntime.setFrameCollector(collector);
+        subscribers.add(collectionRuntime);
+    }
+
+    @Override
+    public synchronized void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
+        dWriter.unsubscribeFeed(collectionRuntime.getFeedFrameWriter());
+        subscribers.remove(collectionRuntime);
+    }
+
+    @Override
+    public synchronized List<ISubscriberRuntime> getSubscribers() {
+        return subscribers;
+    }
+
+    @Override
+    public DistributeFeedFrameWriter getFeedFrameWriter() {
+        return dWriter;
+    }
+
+    public FeedRuntimeType getFeedRuntimeType() {
+        return runtimeId.getFeedRuntimeType();
+    }
+
+    @Override
+    public RecordDescriptor getRecordDescriptor() {
+        return recordDescriptor;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/BasicMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/BasicMonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/BasicMonitoredBuffer.java
new file mode 100644
index 0000000..ad40608
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/BasicMonitoredBuffer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFramePostProcessor;
+import org.apache.asterix.external.feed.api.IFramePreprocessor;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class BasicMonitoredBuffer extends MonitoredBuffer {
+
+    public BasicMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter, FrameTupleAccessor fta,
+            RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+        super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+                exceptionHandler, callback, nPartitions, policyAccessor);
+    }
+
+    @Override
+    protected boolean monitorProcessingRate() {
+        return false;
+    }
+
+    @Override
+    protected boolean logInflowOutflowRate() {
+        return false;
+    }
+
+    @Override
+    protected IFramePreprocessor getFramePreProcessor() {
+        return null;
+    }
+
+    @Override
+    protected IFramePostProcessor getFramePostProcessor() {
+        return null;
+    }
+
+    @Override
+    protected boolean monitorInputQueueLength() {
+        return false;
+    }
+
+    @Override
+    protected boolean reportInflowRate() {
+        return false;
+    }
+
+    @Override
+    protected boolean reportOutflowRate() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/ComputeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/ComputeSideMonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/ComputeSideMonitoredBuffer.java
new file mode 100644
index 0000000..211fc7b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/ComputeSideMonitoredBuffer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFramePostProcessor;
+import org.apache.asterix.external.feed.api.IFramePreprocessor;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class ComputeSideMonitoredBuffer extends MonitoredBuffer {
+
+    public ComputeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+        super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+                exceptionHandler, callback, nPartitions, policyAccessor);
+    }
+
+    @Override
+    protected boolean monitorProcessingRate() {
+        return true;
+    }
+
+    protected boolean logInflowOutflowRate() {
+        return true;
+    }
+
+    @Override
+    protected boolean monitorInputQueueLength() {
+        return true;
+    }
+
+    @Override
+    protected IFramePreprocessor getFramePreProcessor() {
+        return null;
+    }
+
+    @Override
+    protected IFramePostProcessor getFramePostProcessor() {
+        return null;
+    }
+
+    @Override
+    protected boolean reportOutflowRate() {
+        return false;
+    }
+
+    @Override
+    protected boolean reportInflowRate() {
+        return false;
+    }
+
+}