You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:11 UTC

[14/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitAckMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitAckMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitAckMessage.java
new file mode 100644
index 0000000..1130905
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitAckMessage.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.message.FeedMessage;
+
+public class FeedTupleCommitAckMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+    private int intakePartition;
+    private int base;
+    private byte[] commitAcks;
+
+    public FeedTupleCommitAckMessage(FeedConnectionId connectionId, int intakePartition, int base, byte[] commitAcks) {
+        super(MessageType.COMMIT_ACK);
+        this.connectionId = connectionId;
+        this.intakePartition = intakePartition;
+        this.base = base;
+        this.commitAcks = commitAcks;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+        obj.put(FeedConstants.MessageConstants.BASE, base);
+        String commitAcksString = DatatypeConverter.printBase64Binary(commitAcks);
+        obj.put(FeedConstants.MessageConstants.COMMIT_ACKS, commitAcksString);
+        return obj;
+    }
+
+    public static FeedTupleCommitAckMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
+        int base = obj.getInt(FeedConstants.MessageConstants.BASE);
+        String commitAcksString = obj.getString(FeedConstants.MessageConstants.COMMIT_ACKS);
+        byte[] commitAcks = DatatypeConverter.parseBase64Binary(commitAcksString);
+        return new FeedTupleCommitAckMessage(connectionId, intakePartition, base, commitAcks);
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public int getIntakePartition() {
+        return intakePartition;
+    }
+
+    public byte[] getCommitAcks() {
+        return commitAcks;
+    }
+
+    public void reset(int intakePartition, int base, byte[] commitAcks) {
+        this.intakePartition = intakePartition;
+        this.base = base;
+        this.commitAcks = commitAcks;
+    }
+
+    public int getBase() {
+        return base;
+    }
+
+    public void setBase(int base) {
+        this.base = base;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitResponseMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitResponseMessage.java
new file mode 100644
index 0000000..b861e63
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitResponseMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.message.FeedMessage;
+
+public class FeedTupleCommitResponseMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+    private final int intakePartition;
+    private final int maxWindowAcked;
+
+    public FeedTupleCommitResponseMessage(FeedConnectionId connectionId, int intakePartition, int maxWindowAcked) {
+        super(MessageType.COMMIT_ACK_RESPONSE);
+        this.connectionId = connectionId;
+        this.intakePartition = intakePartition;
+        this.maxWindowAcked = maxWindowAcked;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+        obj.put(FeedConstants.MessageConstants.MAX_WINDOW_ACKED, maxWindowAcked);
+        return obj;
+    }
+
+    @Override
+    public String toString() {
+        return connectionId + "[" + intakePartition + "]" + "(" + maxWindowAcked + ")";
+    }
+
+    public static FeedTupleCommitResponseMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
+        int maxWindowAcked = obj.getInt(FeedConstants.MessageConstants.MAX_WINDOW_ACKED);
+        return new FeedTupleCommitResponseMessage(connectionId, intakePartition, maxWindowAcked);
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public int getMaxWindowAcked() {
+        return maxWindowAcked;
+    }
+
+    public int getIntakePartition() {
+        return intakePartition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameCollection.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameCollection.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameCollection.java
new file mode 100644
index 0000000..611f613
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameCollection.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+
+/**
+ * Represents an expandable collection of frames.
+ */
+public class FrameCollection implements IFeedMemoryComponent {
+
+    /** A unique identifier for the feed memory component **/
+    private final int componentId;
+
+    /** A collection of frames (each being a ByteBuffer) **/
+    private final List<ByteBuffer> frames = new LinkedList<ByteBuffer>();
+
+    /** The permitted maximum size, the collection may grow to **/
+    private int maxSize;
+
+    /** The {@link IFeedMemoryManager} for the NodeController **/
+    private final IFeedMemoryManager memoryManager;
+
+    public FrameCollection(int componentId, IFeedMemoryManager memoryManager, int maxSize) {
+        this.componentId = componentId;
+        this.maxSize = maxSize;
+        this.memoryManager = memoryManager;
+    }
+
+    public boolean addFrame(ByteBuffer frame) {
+        if (frames.size() == maxSize) {
+            boolean expansionGranted = memoryManager.expandMemoryComponent(this);
+            if (!expansionGranted) {
+                return false;
+            }
+        }
+        ByteBuffer storageBuffer = ByteBuffer.allocate(frame.capacity());
+        storageBuffer.put(frame);
+        frames.add(storageBuffer);
+        storageBuffer.flip();
+        return true;
+    }
+
+    public Iterator<ByteBuffer> getFrameCollectionIterator() {
+        return frames.iterator();
+    }
+
+    @Override
+    public int getTotalAllocation() {
+        return frames.size();
+    }
+
+    @Override
+    public Type getType() {
+        return Type.COLLECTION;
+    }
+
+    @Override
+    public int getComponentId() {
+        return componentId;
+    }
+
+    @Override
+    public void expand(int delta) {
+        maxSize = maxSize + delta;
+    }
+
+    @Override
+    public void reset() {
+        frames.clear();
+        maxSize = IFeedMemoryManager.START_COLLECTION_SIZE;
+    }
+
+    @Override
+    public String toString() {
+        return "FrameCollection" + "[" + componentId + "]" + "(" + frames.size() + "/" + maxSize + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameDistributor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameDistributor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameDistributor.java
new file mode 100644
index 0000000..937e8f8
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameDistributor.java
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent.Type;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FrameDistributor {
+
+    private static final Logger LOGGER = Logger.getLogger(FrameDistributor.class.getName());
+
+    private static final long MEMORY_AVAILABLE_POLL_PERIOD = 1000; // 1 second
+
+    private final IHyracksTaskContext ctx;
+    private final FeedId feedId;
+    private final FeedRuntimeType feedRuntimeType;
+    private final int partition;
+    private final IFeedMemoryManager memoryManager;
+    private final boolean enableSynchronousTransfer;
+    /** A map storing the registered frame readers ({@code FeedFrameCollector}. **/
+    private final Map<IFrameWriter, FeedFrameCollector> registeredCollectors;
+    private final FrameTupleAccessor fta;
+
+    private DataBucketPool pool;
+    private DistributionMode distributionMode;
+    private boolean spillToDiskRequired = false;
+
+    public enum DistributionMode {
+        /**
+         * A single feed frame collector is registered for receiving tuples.
+         * Tuple is sent via synchronous call, that is no buffering is involved
+         **/
+        SINGLE,
+
+        /**
+         * Multiple feed frame collectors are concurrently registered for
+         * receiving tuples.
+         **/
+        SHARED,
+
+        /**
+         * Feed tuples are not being processed, irrespective of # of registered
+         * feed frame collectors.
+         **/
+        INACTIVE
+    }
+
+    public FrameDistributor(IHyracksTaskContext ctx, FeedId feedId, FeedRuntimeType feedRuntimeType, int partition,
+            boolean enableSynchronousTransfer, IFeedMemoryManager memoryManager, FrameTupleAccessor fta)
+            throws HyracksDataException {
+        this.ctx = ctx;
+        this.feedId = feedId;
+        this.feedRuntimeType = feedRuntimeType;
+        this.partition = partition;
+        this.memoryManager = memoryManager;
+        this.enableSynchronousTransfer = enableSynchronousTransfer;
+        this.registeredCollectors = new HashMap<IFrameWriter, FeedFrameCollector>();
+        this.distributionMode = DistributionMode.INACTIVE;
+        this.fta = fta;
+    }
+
+    public void notifyEndOfFeed() {
+        DataBucket bucket = getDataBucket();
+        if (bucket != null) {
+            sendEndOfFeedDataBucket(bucket);
+        } else {
+            while (bucket == null) {
+                try {
+                    Thread.sleep(MEMORY_AVAILABLE_POLL_PERIOD);
+                    bucket = getDataBucket();
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+            if (bucket != null) {
+                sendEndOfFeedDataBucket(bucket);
+            }
+        }
+    }
+
+    private void sendEndOfFeedDataBucket(DataBucket bucket) {
+        bucket.setContentType(DataBucket.ContentType.EOD);
+        nextBucket(bucket);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("End of feed data packet sent " + this.feedId);
+        }
+    }
+
+    public synchronized void registerFrameCollector(FeedFrameCollector frameCollector) {
+        DistributionMode currentMode = distributionMode;
+        switch (distributionMode) {
+            case INACTIVE:
+                if (!enableSynchronousTransfer) {
+                    pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
+                    frameCollector.start();
+                }
+                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+                setMode(DistributionMode.SINGLE);
+                break;
+            case SINGLE:
+                pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
+                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+                for (FeedFrameCollector reader : registeredCollectors.values()) {
+                    reader.start();
+                }
+                setMode(DistributionMode.SHARED);
+                break;
+            case SHARED:
+                frameCollector.start();
+                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+                break;
+        }
+        evaluateIfSpillIsEnabled();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Switching to " + distributionMode + " mode from " + currentMode + " mode " + " Feed id "
+                    + feedId);
+        }
+    }
+
+    public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector) {
+        switch (distributionMode) {
+            case INACTIVE:
+                throw new IllegalStateException("Invalid attempt to deregister frame collector in " + distributionMode
+                        + " mode.");
+            case SHARED:
+                frameCollector.closeCollector();
+                registeredCollectors.remove(frameCollector.getFrameWriter());
+                int nCollectors = registeredCollectors.size();
+                if (nCollectors == 1) {
+                    FeedFrameCollector loneCollector = registeredCollectors.values().iterator().next();
+                    setMode(DistributionMode.SINGLE);
+                    loneCollector.setState(FeedFrameCollector.State.TRANSITION);
+                    loneCollector.closeCollector();
+                    memoryManager.releaseMemoryComponent(pool);
+                    evaluateIfSpillIsEnabled();
+                } else {
+                    if (!spillToDiskRequired) {
+                        evaluateIfSpillIsEnabled();
+                    }
+                }
+                break;
+            case SINGLE:
+                frameCollector.closeCollector();
+                setMode(DistributionMode.INACTIVE);
+                spillToDiskRequired = false;
+                break;
+
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Deregistered frame reader" + frameCollector + " from feed distributor for " + feedId);
+        }
+    }
+
+    public void evaluateIfSpillIsEnabled() {
+        if (!spillToDiskRequired) {
+            for (FeedFrameCollector collector : registeredCollectors.values()) {
+                spillToDiskRequired = spillToDiskRequired
+                        || collector.getFeedPolicyAccessor().spillToDiskOnCongestion();
+                if (spillToDiskRequired) {
+                    break;
+                }
+            }
+        }
+    }
+
+    public boolean deregisterFrameCollector(IFrameWriter frameWriter) {
+        FeedFrameCollector collector = registeredCollectors.get(frameWriter);
+        if (collector != null) {
+            deregisterFrameCollector(collector);
+            return true;
+        }
+        return false;
+    }
+
+    public synchronized void setMode(DistributionMode mode) {
+        this.distributionMode = mode;
+    }
+
+    public boolean isRegistered(IFrameWriter writer) {
+        return registeredCollectors.get(writer) != null;
+    }
+
+    public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+        switch (distributionMode) {
+            case INACTIVE:
+                break;
+            case SINGLE:
+                FeedFrameCollector collector = registeredCollectors.values().iterator().next();
+                switch (collector.getState()) {
+                    case HANDOVER:
+                    case ACTIVE:
+                        if (enableSynchronousTransfer) {
+                            collector.nextFrame(frame); // processing is synchronous
+                        } else {
+                            handleDataBucket(frame);
+                        }
+                        break;
+                    case TRANSITION:
+                        handleDataBucket(frame);
+                        break;
+                    case FINISHED:
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Discarding fetched tuples, feed has ended [" + registeredCollectors.get(0)
+                                    + "]" + " Feed Id " + feedId + " frame distributor " + this.getFeedRuntimeType());
+                        }
+                        registeredCollectors.remove(0);
+                        break;
+                }
+                break;
+            case SHARED:
+                handleDataBucket(frame);
+                break;
+        }
+    }
+
+    private void nextBucket(DataBucket bucket) {
+        for (FeedFrameCollector collector : registeredCollectors.values()) {
+            collector.sendMessage(bucket); // asynchronous call
+        }
+    }
+
+    private void handleDataBucket(ByteBuffer frame) throws HyracksDataException {
+        DataBucket bucket = getDataBucket();
+        if (bucket == null) {
+            handleFrameDuringMemoryCongestion(frame);
+        } else {
+            bucket.reset(frame);
+            bucket.setDesiredReadCount(registeredCollectors.size());
+            nextBucket(bucket);
+        }
+    }
+
+    private void handleFrameDuringMemoryCongestion(ByteBuffer frame) throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("Unable to allocate memory, will evaluate the need to spill");
+        }
+        // wait till memory is available
+    }
+
+    private DataBucket getDataBucket() {
+        DataBucket bucket = null;
+        if (pool != null) {
+            bucket = pool.getDataBucket();
+            if (bucket != null) {
+                bucket.setDesiredReadCount(registeredCollectors.size());
+                return bucket;
+            } else {
+                return null;
+            }
+        }
+        return null;
+    }
+
+    public DistributionMode getMode() {
+        return distributionMode;
+    }
+
+    public void close() {
+        switch (distributionMode) {
+            case INACTIVE:
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("FrameDistributor is " + distributionMode);
+                }
+                break;
+            case SINGLE:
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Disconnecting single frame reader in " + distributionMode + " mode " + " for  feedId "
+                            + feedId + " " + this.feedRuntimeType);
+                }
+                setMode(DistributionMode.INACTIVE);
+                if (!enableSynchronousTransfer) {
+                    notifyEndOfFeed(); // send EOD Data Bucket
+                    waitForCollectorsToFinish();
+                }
+                registeredCollectors.values().iterator().next().disconnect();
+                break;
+            case SHARED:
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Signalling End Of Feed; currently operating in " + distributionMode + " mode");
+                }
+                notifyEndOfFeed(); // send EOD Data Bucket
+                waitForCollectorsToFinish();
+                break;
+        }
+    }
+
+    private void waitForCollectorsToFinish() {
+        synchronized (registeredCollectors.values()) {
+            while (!allCollectorsFinished()) {
+                try {
+                    registeredCollectors.values().wait();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private boolean allCollectorsFinished() {
+        boolean allFinished = true;
+        for (FeedFrameCollector collector : registeredCollectors.values()) {
+            allFinished = allFinished && collector.getState().equals(FeedFrameCollector.State.FINISHED);
+        }
+        return allFinished;
+    }
+
+    public Collection<FeedFrameCollector> getRegisteredCollectors() {
+        return registeredCollectors.values();
+    }
+
+    public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
+        return registeredCollectors;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    public DistributionMode getDistributionMode() {
+        return distributionMode;
+    }
+
+    public FeedRuntimeType getFeedRuntimeType() {
+        return feedRuntimeType;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public FrameTupleAccessor getFta() {
+        return fta;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameEventCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameEventCallback.java
new file mode 100644
index 0000000..780a332
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameEventCallback.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameEventCallback implements IFrameEventCallback {
+
+    private static final Logger LOGGER = Logger.getLogger(FrameEventCallback.class.getName());
+
+    private final FeedPolicyAccessor fpa;
+    private final FeedRuntimeInputHandler inputSideHandler;
+    private IFrameWriter coreOperator;
+
+    public FrameEventCallback(FeedPolicyAccessor fpa, FeedRuntimeInputHandler inputSideHandler,
+            IFrameWriter coreOperator) {
+        this.fpa = fpa;
+        this.inputSideHandler = inputSideHandler;
+        this.coreOperator = coreOperator;
+    }
+
+    @Override
+    public void frameEvent(FrameEvent event) {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Frame Event for " + inputSideHandler.getRuntimeId() + " " + event);
+        }
+        if (!event.equals(FrameEvent.FINISHED_PROCESSING_SPILLAGE)
+                && inputSideHandler.getMode().equals(Mode.PROCESS_SPILL)) {
+            return;
+        }
+        switch (event) {
+            case PENDING_WORK_THRESHOLD_REACHED:
+                if (fpa.spillToDiskOnCongestion()) {
+                    inputSideHandler.setMode(Mode.SPILL);
+                } else if (fpa.discardOnCongestion()) {
+                    inputSideHandler.setMode(Mode.DISCARD);
+                } else if (fpa.throttlingEnabled()) {
+                    inputSideHandler.setThrottlingEnabled(true);
+                } else {
+                    try {
+                        inputSideHandler.reportUnresolvableCongestion();
+                    } catch (HyracksDataException e) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to report congestion!!!");
+                        }
+                    }
+                }
+                break;
+            case FINISHED_PROCESSING:
+                inputSideHandler.setFinished(true);
+                synchronized (coreOperator) {
+                    coreOperator.notifyAll();
+                }
+                break;
+            case PENDING_WORK_DONE:
+                switch (inputSideHandler.getMode()) {
+                    case SPILL:
+                    case DISCARD:
+                    case POST_SPILL_DISCARD:
+                        inputSideHandler.setMode(Mode.PROCESS);
+                        break;
+                    default:
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Received " + event + " ignoring as operating in " + inputSideHandler.getMode());
+                        }
+                }
+                break;
+            case FINISHED_PROCESSING_SPILLAGE:
+                inputSideHandler.setMode(Mode.PROCESS);
+                break;
+            default:
+                break;
+        }
+    }
+
+    public void setCoreOperator(IFrameWriter coreOperator) {
+        this.coreOperator = coreOperator;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFeedManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFeedManager.java
deleted file mode 100644
index 6cdc45c..0000000
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFeedManager.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.common.feeds;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
-
-/**
- * Handle (de)registration of feeds for delivery of control messages.
- */
-public interface IFeedManager {
-
-    public static final long SOCKET_CONNECT_TIMEOUT = 5000;
-
-    /**
-     * Returns the executor service associated with the feed.
-     * 
-     * @param feedId
-     * @return
-     */
-    public ExecutorService getFeedExecutorService(FeedConnectionId feedId);
-
-    /**
-     * Allows registration of a feedRuntime.
-     * 
-     * @param feedRuntime
-     * @throws Exception
-     */
-    public void registerFeedRuntime(FeedRuntime feedRuntime) throws Exception;
-
-    /**
-     * Allows de-registration of a feed runtime.
-     * 
-     * @param feedRuntimeId
-     */
-    public void deRegisterFeedRuntime(FeedRuntimeId feedRuntimeId);
-
-    /**
-     * Obtain feed runtime corresponding to a feedRuntimeId
-     * 
-     * @param feedRuntimeId
-     * @return
-     */
-    public FeedRuntime getFeedRuntime(FeedRuntimeId feedRuntimeId);
-
-    /**
-     * Register the Super Feed Manager associated witht a feed.
-     * 
-     * @param feedId
-     * @param sfm
-     * @throws Exception
-     */
-    public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) throws Exception;
-
-    /**
-     * Obtain a handle to the Super Feed Manager associated with the feed.
-     * 
-     * @param feedId
-     * @return
-     */
-    public SuperFeedManager getSuperFeedManager(FeedConnectionId feedId);
-
-    /**
-     * De-register a feed
-     * 
-     * @param feedId
-     * @throws IOException
-     */
-    void deregisterFeed(FeedConnectionId feedId);
-
-    /**
-     * Obtain the feed runtime manager associated with a feed.
-     * 
-     * @param feedId
-     * @return
-     */
-    public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId);
-
-    /**
-     * Obtain a handle to the feed Message service associated with a feed.
-     * 
-     * @param feedId
-     * @return
-     */
-    public FeedMessageService getFeedMessageService(FeedConnectionId feedId);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePostProcessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePostProcessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePostProcessor.java
new file mode 100644
index 0000000..3bda2db
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePostProcessor.java
@@ -0,0 +1,10 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public interface IFramePostProcessor {
+
+    public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePreprocessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePreprocessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePreprocessor.java
new file mode 100644
index 0000000..468c13c
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePreprocessor.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+
+public interface IFramePreprocessor {
+
+    public void preProcess(ByteBuffer frame) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IngestionRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IngestionRuntime.java
new file mode 100644
index 0000000..b7eb3b8
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IngestionRuntime.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.logging.Level;
+
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class IngestionRuntime extends SubscribableRuntime {
+
+    private final IAdapterRuntimeManager adapterRuntimeManager;
+
+    public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
+            RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager) {
+        super(feedId, runtimeId, null, feedWriter, recordDesc);
+        this.adapterRuntimeManager = adaptorRuntimeManager;
+    }
+
+    public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception {
+        FeedFrameCollector reader = dWriter.subscribeFeed(fpa, collectionRuntime.getInputHandler(),
+                collectionRuntime.getConnectionId());
+        collectionRuntime.setFrameCollector(reader);
+        
+        if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
+            adapterRuntimeManager.start();
+        }
+        subscribers.add(collectionRuntime);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this);
+        }
+    }
+
+    public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
+        dWriter.unsubscribeFeed(collectionRuntime.getInputHandler());
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Unsubscribed feed collection [" + collectionRuntime + "] from " + this);
+        }
+        if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.INACTIVE)) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Stopping adapter for " + this + " as no more registered collectors");
+            }
+            adapterRuntimeManager.stop();
+        }
+        subscribers.remove(collectionRuntime);
+    }
+
+    public void endOfFeed() {
+        dWriter.notifyEndOfFeed();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Notified End Of Feed  [" + this + "]");
+        }
+    }
+
+    public IAdapterRuntimeManager getAdapterRuntimeManager() {
+        return adapterRuntimeManager;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakePartitionStatistics.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakePartitionStatistics.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakePartitionStatistics.java
new file mode 100644
index 0000000..656797e
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakePartitionStatistics.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.BitSet;
+
+public class IntakePartitionStatistics {
+
+	public static int ACK_WINDOW_SIZE = 1024;
+	private int partition;
+	private int base;
+	private BitSet bitSet;
+
+	public IntakePartitionStatistics(int partition, int base) {
+		this.partition = partition;
+		this.base = base;
+		this.bitSet = new BitSet(ACK_WINDOW_SIZE);
+	}
+
+	public void ackRecordId(int recordId) {
+		int posIndexWithinBase = recordId % ACK_WINDOW_SIZE;
+		this.bitSet.set(posIndexWithinBase);
+	}
+
+	public byte[] getAckInfo() {
+		return bitSet.toByteArray();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakeSideMonitoredBuffer.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakeSideMonitoredBuffer.java
new file mode 100644
index 0000000..fdd6ec4
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakeSideMonitoredBuffer.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class IntakeSideMonitoredBuffer extends MonitoredBuffer {
+
+    public IntakeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+        super(ctx, inputHandler, frameWriter, fta,  recordDesc, metricCollector, connectionId, runtimeId,
+                exceptionHandler, callback, nPartitions, policyAccessor);
+    }
+
+    @Override
+    protected boolean monitorProcessingRate() {
+        return false;
+    }
+
+    @Override
+    protected boolean logInflowOutflowRate() {
+        return false;
+    }
+
+    @Override
+    protected IFramePreprocessor getFramePreProcessor() {
+        return null;
+    }
+
+    @Override
+    protected IFramePostProcessor getFramePostProcessor() {
+        return null;
+    }
+
+    @Override
+    protected boolean monitorInputQueueLength() {
+        return false;
+    }
+
+    @Override
+    protected boolean reportOutflowRate() {
+        return false;
+    }
+
+    @Override
+    protected boolean reportInflowRate() {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageListener.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageListener.java
index 7beb212..2e21ea7 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageListener.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageListener.java
@@ -43,15 +43,12 @@ public class MessageListener {
 
     public void stop() {
         listenerServer.stop();
-        System.out.println("STOPPED MESSAGE RECEIVING SERVICE AT " + port);
         if (!executorService.isShutdown()) {
             executorService.shutdownNow();
         }
-
     }
 
     public void start() throws IOException {
-        System.out.println("STARTING MESSAGE RECEIVING SERVICE AT " + port);
         listenerServer = new MessageListenerServer(port, outbox);
         executorService.execute(listenerServer);
     }
@@ -62,6 +59,8 @@ public class MessageListener {
         private final LinkedBlockingQueue<String> outbox;
         private ServerSocket server;
 
+        private static final char EOL = (char) "\n".getBytes()[0];
+
         public MessageListenerServer(int port, LinkedBlockingQueue<String> outbox) {
             this.port = port;
             this.outbox = outbox;
@@ -77,7 +76,6 @@ public class MessageListener {
 
         @Override
         public void run() {
-            char EOL = (char) "\n".getBytes()[0];
             Socket client = null;
             try {
                 server = new ServerSocket(port);
@@ -121,59 +119,4 @@ public class MessageListener {
 
     }
 
-    private static class MessageParser implements Runnable {
-
-        private Socket client;
-        private IMessageAnalyzer messageAnalyzer;
-        private static final char EOL = (char) "\n".getBytes()[0];
-
-        public MessageParser(Socket client, IMessageAnalyzer messageAnalyzer) {
-            this.client = client;
-            this.messageAnalyzer = messageAnalyzer;
-        }
-
-        @Override
-        public void run() {
-            CharBuffer buffer = CharBuffer.allocate(5000);
-            char ch;
-            try {
-                InputStream in = client.getInputStream();
-                while (true) {
-                    ch = (char) in.read();
-                    if (((int) ch) == -1) {
-                        break;
-                    }
-                    while (ch != EOL) {
-                        buffer.put(ch);
-                        ch = (char) in.read();
-                    }
-                    buffer.flip();
-                    String s = new String(buffer.array());
-                    synchronized (messageAnalyzer) {
-                        messageAnalyzer.getMessageQueue().add(s + "\n");
-                    }
-                    buffer.position(0);
-                    buffer.limit(5000);
-                }
-            } catch (IOException ioe) {
-                ioe.printStackTrace();
-            } finally {
-                try {
-                    client.close();
-                } catch (IOException ioe) {
-                    // do nothing
-                }
-            }
-        }
-    }
-
-    public static interface IMessageAnalyzer {
-
-        /**
-         * @return
-         */
-        public LinkedBlockingQueue<String> getMessageQueue();
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageReceiver.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageReceiver.java
new file mode 100644
index 0000000..2cb1066
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageReceiver.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IMessageReceiver;
+
+public abstract class MessageReceiver<T> implements IMessageReceiver<T> {
+
+    protected static final Logger LOGGER = Logger.getLogger(MessageReceiver.class.getName());
+
+    protected final LinkedBlockingQueue<T> inbox;
+    protected ExecutorService executor;
+
+    public MessageReceiver() {
+        inbox = new LinkedBlockingQueue<T>();
+    }
+
+    public abstract void processMessage(T message) throws Exception;
+
+    @Override
+    public void start() {
+        executor = Executors.newSingleThreadExecutor();
+        executor.execute(new MessageReceiverRunnable<T>(this));
+    }
+
+    @Override
+    public synchronized void sendMessage(T message) {
+        inbox.add(message);
+    }
+
+    @Override
+    public void close(boolean processPending) {
+        if (executor != null) {
+            executor.shutdown();
+            executor = null;
+            if (processPending) {
+                flushPendingMessages();
+            } else {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Will discard the pending frames " + inbox.size());
+                }
+            }
+        }
+    }
+
+    private static class MessageReceiverRunnable<T> implements Runnable {
+
+        private final LinkedBlockingQueue<T> inbox;
+        private final MessageReceiver<T> messageReceiver;
+
+        public MessageReceiverRunnable(MessageReceiver<T> messageReceiver) {
+            this.inbox = messageReceiver.inbox;
+            this.messageReceiver = messageReceiver;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    T message = inbox.take();
+                    messageReceiver.processMessage(message);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    protected void flushPendingMessages() {
+        while (!inbox.isEmpty()) {
+            T message = null;
+            try {
+                message = inbox.take();
+                processMessage(message);
+            } catch (InterruptedException ie) {
+                // ignore exception but break from the loop
+                break;
+            } catch (Exception e) {
+                e.printStackTrace();
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Exception " + e + " in processing message " + message);
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBuffer.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBuffer.java
new file mode 100644
index 0000000..90e340f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBuffer.java
@@ -0,0 +1,386 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.logging.Level;
+
+import edu.uci.ics.asterix.common.feeds.MonitoredBufferTimerTasks.LogInputOutputRateTask;
+import edu.uci.ics.asterix.common.feeds.MonitoredBufferTimerTasks.MonitorInputQueueLengthTimerTask;
+import edu.uci.ics.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoreProcessRateTimerTask;
+import edu.uci.ics.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector.ValueType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback.FrameEvent;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public abstract class MonitoredBuffer extends MessageReceiver<DataBucket> {
+
+    protected static final long LOG_INPUT_OUTPUT_RATE_FREQUENCY = 5000; // 5 seconds
+    protected static final long INPUT_QUEUE_MEASURE_FREQUENCY = 1000; // 1 second
+    protected static final long PROCESSING_RATE_MEASURE_FREQUENCY = 10000; // 10 seconds
+
+    protected static final int PROCESS_RATE_REFRESH = 2; // refresh processing rate every 10th frame
+
+    protected final IHyracksTaskContext ctx;
+    protected final FeedConnectionId connectionId;
+    protected final FeedRuntimeId runtimeId;
+    protected final FrameTupleAccessor inflowFta;
+    protected final FrameTupleAccessor outflowFta;
+    protected final FeedRuntimeInputHandler inputHandler;
+    protected final IFrameEventCallback callback;
+    protected final Timer timer;
+    private final RecordDescriptor recordDesc;
+    private final IExceptionHandler exceptionHandler;
+    protected final FeedPolicyAccessor policyAccessor;
+    protected int nPartitions;
+
+    private IFrameWriter frameWriter;
+    protected IFeedMetricCollector metricCollector;
+    protected boolean monitorProcessingRate = false;
+    protected boolean monitorInputQueueLength = false;
+    protected boolean logInflowOutflowRate = false;
+    protected boolean reportOutflowRate = false;
+    protected boolean reportInflowRate = false;
+
+    protected int inflowReportSenderId = -1;
+    protected int outflowReportSenderId = -1;
+    protected TimerTask monitorInputQueueLengthTask;
+    protected TimerTask processingRateTask;
+    protected TimerTask logInflowOutflowRateTask;
+    protected MonitoredBufferStorageTimerTask storageTimeTrackingRateTask;
+    protected StorageFrameHandler storageFromeHandler;
+
+    protected int processingRate = -1;
+    protected int frameCount = 0;
+    private long avgDelayPersistence = 0;
+    private boolean active;
+    private Map<Integer, Long> tupleTimeStats;
+    IFramePostProcessor postProcessor = null;
+    IFramePreprocessor preProcessor = null;
+
+    public static MonitoredBuffer getMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
+            IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+            IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
+            FeedPolicyAccessor policyAccessor) {
+        switch (runtimeId.getFeedRuntimeType()) {
+            case COMPUTE:
+                return new ComputeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+            case STORE:
+                return new StorageSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+            case COLLECT:
+                return new IntakeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+            default:
+                return new BasicMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+        }
+    }
+
+    protected MonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+        this.ctx = ctx;
+        this.connectionId = connectionId;
+        this.frameWriter = frameWriter;
+        this.inflowFta = new FrameTupleAccessor(recordDesc);
+        this.outflowFta = new FrameTupleAccessor(recordDesc);
+        this.runtimeId = runtimeId;
+        this.metricCollector = metricCollector;
+        this.exceptionHandler = exceptionHandler;
+        this.callback = callback;
+        this.inputHandler = inputHandler;
+        this.timer = new Timer();
+        this.recordDesc = recordDesc;
+        this.policyAccessor = policyAccessor;
+        this.nPartitions = nPartitions;
+        this.active = true;
+        initializeMonitoring();
+    }
+
+    protected abstract boolean monitorProcessingRate();
+
+    protected abstract boolean logInflowOutflowRate();
+
+    protected abstract boolean reportOutflowRate();
+
+    protected abstract boolean reportInflowRate();
+
+    protected abstract boolean monitorInputQueueLength();
+
+    protected abstract IFramePreprocessor getFramePreProcessor();
+
+    protected abstract IFramePostProcessor getFramePostProcessor();
+
+    protected void initializeMonitoring() {
+        monitorProcessingRate = monitorProcessingRate();
+        monitorInputQueueLength = monitorInputQueueLength();
+        reportInflowRate = reportInflowRate();
+        reportOutflowRate = reportOutflowRate();
+        logInflowOutflowRate = policyAccessor.isLoggingStatisticsEnabled() || logInflowOutflowRate();
+
+        if (monitorProcessingRate && policyAccessor.isElastic()) { // check possibility to scale in
+            this.processingRateTask = new MonitoreProcessRateTimerTask(this, inputHandler.getFeedManager(),
+                    connectionId, nPartitions);
+            this.timer.scheduleAtFixedRate(processingRateTask, 0, PROCESSING_RATE_MEASURE_FREQUENCY);
+        }
+
+        if (monitorInputQueueLength
+                && (policyAccessor.isElastic() || policyAccessor.throttlingEnabled()
+                        || policyAccessor.spillToDiskOnCongestion() || policyAccessor.discardOnCongestion())) {
+            this.monitorInputQueueLengthTask = new MonitorInputQueueLengthTimerTask(this, callback);
+            this.timer.scheduleAtFixedRate(monitorInputQueueLengthTask, 0, INPUT_QUEUE_MEASURE_FREQUENCY);
+        }
+
+        if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
+            this.logInflowOutflowRateTask = new LogInputOutputRateTask(this, logInflowOutflowRate, reportInflowRate,
+                    reportOutflowRate);
+            this.timer.scheduleAtFixedRate(logInflowOutflowRateTask, 0, LOG_INPUT_OUTPUT_RATE_FREQUENCY);
+            this.inflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
+                    ValueType.INFLOW_RATE, MetricType.RATE);
+            this.outflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
+                    ValueType.OUTFLOW_RATE, MetricType.RATE);
+        }
+    }
+
+    protected void deinitializeMonitoring() {
+        if (monitorInputQueueLengthTask != null) {
+            monitorInputQueueLengthTask.cancel();
+        }
+        if (processingRateTask != null) {
+            processingRateTask.cancel();
+        }
+        if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
+            metricCollector.removeReportSender(inflowReportSenderId);
+            metricCollector.removeReportSender(outflowReportSenderId);
+            logInflowOutflowRateTask.cancel();
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Disabled monitoring for " + this.runtimeId);
+        }
+    }
+
+    protected void postProcessFrame(long startTime, ByteBuffer frame) throws Exception {
+        if (monitorProcessingRate) {
+            frameCount++;
+            if (frameCount % PROCESS_RATE_REFRESH == 0) {
+                long endTime = System.currentTimeMillis();
+                processingRate = (int) ((double) outflowFta.getTupleCount() * 1000 / (endTime - startTime));
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Processing Rate :" + processingRate + " tuples/sec");
+                }
+                frameCount = 0;
+            }
+        }
+
+        if (logInflowOutflowRate || reportOutflowRate) {
+            metricCollector.sendReport(outflowReportSenderId, outflowFta.getTupleCount());
+        }
+
+        postProcessFrame(frame);
+
+    }
+
+    protected void preProcessFrame(ByteBuffer frame) throws Exception {
+        if (postProcessor == null) {
+            preProcessor = getFramePreProcessor();
+        }
+        if (preProcessor != null) {
+            preProcessor.preProcess(frame);
+        }
+    }
+
+    protected void postProcessFrame(ByteBuffer frame) throws Exception {
+        if (postProcessor == null) {
+            postProcessor = getFramePostProcessor();
+        }
+        if (postProcessor != null) {
+            outflowFta.reset(frame);
+            postProcessor.postProcessFrame(frame, outflowFta);
+        }
+    }
+
+    @Override
+    public void sendMessage(DataBucket message) {
+        inbox.add(message);
+    }
+
+    public void sendReport(ByteBuffer frame) {
+        if ((logInflowOutflowRate || reportInflowRate)
+                && !(inputHandler.getMode().equals(Mode.PROCESS_BACKLOG) || inputHandler.getMode().equals(
+                        Mode.PROCESS_SPILL))) {
+            inflowFta.reset(frame);
+            metricCollector.sendReport(inflowReportSenderId, inflowFta.getTupleCount());
+        }
+    }
+
+    /** return rate in terms of tuples/sec **/
+    public int getInflowRate() {
+        return metricCollector.getMetric(inflowReportSenderId);
+    }
+
+    /** return rate in terms of tuples/sec **/
+    public int getOutflowRate() {
+        return metricCollector.getMetric(outflowReportSenderId);
+    }
+
+    /** return the number of pending frames from the input queue **/
+    public int getWorkSize() {
+        return inbox.size();
+    }
+
+    /** reset the number of partitions (cardinality) for the runtime **/
+    public void setNumberOfPartitions(int nPartitions) {
+        if (processingRateTask != null) {
+            int currentPartitions = ((MonitoreProcessRateTimerTask) processingRateTask).getNumberOfPartitions();
+            if (currentPartitions != nPartitions) {
+                ((MonitoreProcessRateTimerTask) processingRateTask).setNumberOfPartitions(nPartitions);
+            }
+        }
+    }
+
+    public FeedRuntimeInputHandler getInputHandler() {
+        return inputHandler;
+    }
+
+    public synchronized void close(boolean processPending, boolean disableMonitoring) {
+        super.close(processPending);
+        if (disableMonitoring) {
+            deinitializeMonitoring();
+        }
+        active = false;
+    }
+
+    @Override
+    public synchronized void processMessage(DataBucket message) throws Exception {
+        if (!active) {
+            message.doneReading();
+            return;
+        }
+        switch (message.getContentType()) {
+            case DATA:
+                boolean finishedProcessing = false;
+                ByteBuffer frameReceived = message.getContent();
+                ByteBuffer frameToProcess = null;
+                if (inputHandler.isThrottlingEnabled()) {
+                    inflowFta.reset(frameReceived);
+                    int pRate = getProcessingRate();
+                    int inflowRate = getInflowRate();
+                    if (inflowRate > pRate) {
+                        double retainFraction = (pRate * 0.8 / inflowRate);
+                        frameToProcess = throttleFrame(inflowFta, retainFraction);
+                        inflowFta.reset(frameToProcess);
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Throttling at fraction " + retainFraction + "inflow rate " + inflowRate
+                                    + " no of tuples remaining " + inflowFta.getTupleCount());
+
+                        }
+                    } else {
+                        frameToProcess = frameReceived;
+                    }
+                } else {
+                    frameToProcess = frameReceived;
+                }
+                outflowFta.reset(frameToProcess);
+                long startTime = 0;
+                while (!finishedProcessing) {
+                    try {
+                        inflowFta.reset(frameToProcess);
+                        startTime = System.currentTimeMillis();
+                        preProcessFrame(frameToProcess);
+                        frameWriter.nextFrame(frameToProcess);
+                        postProcessFrame(startTime, frameToProcess);
+                        finishedProcessing = true;
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        frameToProcess = exceptionHandler.handleException(e, frameToProcess);
+                        finishedProcessing = true;
+                    }
+                }
+                message.doneReading();
+                break;
+            case EOD:
+                message.doneReading();
+                timer.cancel();
+                callback.frameEvent(FrameEvent.FINISHED_PROCESSING);
+                break;
+            case EOSD:
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Done processing spillage");
+                }
+                message.doneReading();
+                callback.frameEvent(FrameEvent.FINISHED_PROCESSING_SPILLAGE);
+                break;
+
+        }
+    }
+
+    private ByteBuffer throttleFrame(FrameTupleAccessor fta, double retainFraction) throws HyracksDataException {
+        int desiredTuples = (int) (fta.getTupleCount() * retainFraction);
+        return FeedFrameUtil.getSampledFrame(ctx, fta, desiredTuples);
+    }
+
+    public Mode getMode() {
+        return inputHandler.getMode();
+    }
+
+    public FeedRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
+    public void setFrameWriter(IFrameWriter frameWriter) {
+        this.frameWriter = frameWriter;
+    }
+
+    public void reset() {
+        active = true;
+        if (logInflowOutflowRate) {
+            metricCollector.resetReportSender(inflowReportSenderId);
+            metricCollector.resetReportSender(outflowReportSenderId);
+        }
+    }
+
+    public int getProcessingRate() {
+        return processingRate;
+    }
+
+    public Map<Integer, Long> getTupleTimeStats() {
+        return tupleTimeStats;
+    }
+
+    public long getAvgDelayRecordPersistence() {
+        return avgDelayPersistence;
+    }
+
+    public MonitoredBufferStorageTimerTask getStorageTimeTrackingRateTask() {
+        return storageTimeTrackingRateTask;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBufferTimerTasks.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBufferTimerTasks.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBufferTimerTasks.java
new file mode 100644
index 0000000..13c979f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBufferTimerTasks.java
@@ -0,0 +1,290 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessageService;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector.ValueType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback.FrameEvent;
+import edu.uci.ics.asterix.common.feeds.message.FeedReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ScaleInReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.StorageReportFeedMessage;
+
+public class MonitoredBufferTimerTasks {
+
+    private static final Logger LOGGER = Logger.getLogger(MonitorInputQueueLengthTimerTask.class.getName());
+
+    public static class MonitoredBufferStorageTimerTask extends TimerTask {
+
+        private static final int PERSISTENCE_DELAY_VIOLATION_MAX = 5;
+
+        private final StorageSideMonitoredBuffer mBuffer;
+        private final IFeedManager feedManager;
+        private final int partition;
+        private final FeedConnectionId connectionId;
+        private final FeedPolicyAccessor policyAccessor;
+        private final StorageFrameHandler storageFromeHandler;
+        private final StorageReportFeedMessage storageReportMessage;
+        private final FeedTupleCommitAckMessage tupleCommitAckMessage;
+
+        private Map<Integer, Integer> maxIntakeBaseCovered;
+        private int countDelayExceeded = 0;
+
+        public MonitoredBufferStorageTimerTask(StorageSideMonitoredBuffer mBuffer, IFeedManager feedManager,
+                FeedConnectionId connectionId, int partition, FeedPolicyAccessor policyAccessor,
+                StorageFrameHandler storageFromeHandler) {
+            this.mBuffer = mBuffer;
+            this.feedManager = feedManager;
+            this.connectionId = connectionId;
+            this.partition = partition;
+            this.policyAccessor = policyAccessor;
+            this.storageFromeHandler = storageFromeHandler;
+            this.storageReportMessage = new StorageReportFeedMessage(this.connectionId, this.partition, 0, false, 0, 0);
+            this.tupleCommitAckMessage = new FeedTupleCommitAckMessage(this.connectionId, 0, 0, null);
+            this.maxIntakeBaseCovered = new HashMap<Integer, Integer>();
+        }
+
+        @Override
+        public void run() {
+            if (mBuffer.isAckingEnabled() && !mBuffer.getInputHandler().isThrottlingEnabled()) {
+                ackRecords();
+            }
+            if (mBuffer.isTimeTrackingEnabled()) {
+                checkLatencyViolation();
+            }
+        }
+
+        private void ackRecords() {
+            Set<Integer> partitions = storageFromeHandler.getPartitionsWithStats();
+            List<Integer> basesCovered = new ArrayList<Integer>();
+            for (int intakePartition : partitions) {
+                Map<Integer, IntakePartitionStatistics> baseAcks = storageFromeHandler
+                        .getBaseAcksForPartition(intakePartition);
+                for (Entry<Integer, IntakePartitionStatistics> entry : baseAcks.entrySet()) {
+                    int base = entry.getKey();
+                    IntakePartitionStatistics stats = entry.getValue();
+                    Integer maxIntakeBaseForPartition = maxIntakeBaseCovered.get(intakePartition);
+                    if (maxIntakeBaseForPartition == null || maxIntakeBaseForPartition < base) {
+                        tupleCommitAckMessage.reset(intakePartition, base, stats.getAckInfo());
+                        feedManager.getFeedMessageService().sendMessage(tupleCommitAckMessage);
+                    } else {
+                        basesCovered.add(base);
+                    }
+                }
+                for (Integer b : basesCovered) {
+                    baseAcks.remove(b);
+                }
+                basesCovered.clear();
+            }
+        }
+
+        private void checkLatencyViolation() {
+            long avgDelayPersistence = storageFromeHandler.getAvgDelayPersistence();
+            if (avgDelayPersistence > policyAccessor.getMaxDelayRecordPersistence()) {
+                countDelayExceeded++;
+                if (countDelayExceeded > PERSISTENCE_DELAY_VIOLATION_MAX) {
+                    storageReportMessage.reset(0, false, mBuffer.getAvgDelayRecordPersistence());
+                    feedManager.getFeedMessageService().sendMessage(storageReportMessage);
+                }
+            } else {
+                countDelayExceeded = 0;
+            }
+        }
+
+        public void receiveCommitAckResponse(FeedTupleCommitResponseMessage message) {
+            maxIntakeBaseCovered.put(message.getIntakePartition(), message.getMaxWindowAcked());
+        }
+    }
+
+    public static class LogInputOutputRateTask extends TimerTask {
+
+        private final MonitoredBuffer mBuffer;
+        private final boolean log;
+        private final boolean reportInflow;
+        private final boolean reportOutflow;
+
+        private final IFeedMessageService messageService;
+        private final FeedReportMessage message;
+
+        public LogInputOutputRateTask(MonitoredBuffer mBuffer, boolean log, boolean reportInflow, boolean reportOutflow) {
+            this.mBuffer = mBuffer;
+            this.log = log;
+            this.reportInflow = reportInflow;
+            this.reportOutflow = reportOutflow;
+            if (reportInflow || reportOutflow) {
+                ValueType vType = reportInflow ? ValueType.INFLOW_RATE : ValueType.OUTFLOW_RATE;
+                messageService = mBuffer.getInputHandler().getFeedManager().getFeedMessageService();
+                message = new FeedReportMessage(mBuffer.getInputHandler().getConnectionId(), mBuffer.getRuntimeId(),
+                        vType, 0);
+            } else {
+                messageService = null;
+                message = null;
+            }
+
+        }
+
+        @Override
+        public void run() {
+            int pendingWork = mBuffer.getWorkSize();
+            int outflowRate = mBuffer.getOutflowRate();
+            int inflowRate = mBuffer.getInflowRate();
+            if (log) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info(mBuffer.getRuntimeId() + " " + "Inflow rate:" + inflowRate + " Outflow Rate:"
+                            + outflowRate + " Pending Work " + pendingWork);
+                }
+            }
+            if (reportInflow) {
+                message.reset(inflowRate);
+            } else if (reportOutflow) {
+                message.reset(outflowRate);
+            }
+            messageService.sendMessage(message);
+        }
+    }
+
+    public static class MonitorInputQueueLengthTimerTask extends TimerTask {
+
+        private final MonitoredBuffer mBuffer;
+        private final IFrameEventCallback callback;
+        private final int pendingWorkThreshold;
+        private final int maxSuccessiveThresholdPeriods;
+        private FrameEvent lastEvent = FrameEvent.NO_OP;
+        private int pendingWorkExceedCount = 0;
+
+        public MonitorInputQueueLengthTimerTask(MonitoredBuffer mBuffer, IFrameEventCallback callback) {
+            this.mBuffer = mBuffer;
+            this.callback = callback;
+            AsterixFeedProperties props = mBuffer.getInputHandler().getFeedManager().getAsterixFeedProperties();
+            pendingWorkThreshold = props.getPendingWorkThreshold();
+            maxSuccessiveThresholdPeriods = props.getMaxSuccessiveThresholdPeriod();
+        }
+
+        @Override
+        public void run() {
+            int pendingWork = mBuffer.getWorkSize();
+            if (mBuffer.getMode().equals(Mode.PROCESS_SPILL) || mBuffer.getMode().equals(Mode.PROCESS_BACKLOG)) {
+                return;
+            }
+
+            switch (lastEvent) {
+                case NO_OP:
+                case PENDING_WORK_DONE:
+                case FINISHED_PROCESSING_SPILLAGE:
+                    if (pendingWork > pendingWorkThreshold) {
+                        pendingWorkExceedCount++;
+                        if (pendingWorkExceedCount > maxSuccessiveThresholdPeriods) {
+                            pendingWorkExceedCount = 0;
+                            lastEvent = FrameEvent.PENDING_WORK_THRESHOLD_REACHED;
+                            callback.frameEvent(lastEvent);
+                        }
+                    } else if (pendingWork == 0 && mBuffer.getMode().equals(Mode.SPILL)) {
+                        lastEvent = FrameEvent.PENDING_WORK_DONE;
+                        callback.frameEvent(lastEvent);
+                    }
+                    break;
+                case PENDING_WORK_THRESHOLD_REACHED:
+                    if (((pendingWork * 1.0) / pendingWorkThreshold) <= 0.5) {
+                        lastEvent = FrameEvent.PENDING_WORK_DONE;
+                        callback.frameEvent(lastEvent);
+                    }
+                    break;
+                case FINISHED_PROCESSING:
+                    break;
+
+            }
+        }
+    }
+
+    /**
+     * A timer task to measure and compare the processing rate and inflow rate
+     * to look for possibility to scale-in, that is reduce the degree of cardinality
+     * of the compute operator.
+     */
+    public static class MonitoreProcessRateTimerTask extends TimerTask {
+
+        private final MonitoredBuffer mBuffer;
+        private final IFeedManager feedManager;
+        private int nPartitions;
+        private ScaleInReportMessage sMessage;
+        private boolean proposedChange;
+
+        public MonitoreProcessRateTimerTask(MonitoredBuffer mBuffer, IFeedManager feedManager,
+                FeedConnectionId connectionId, int nPartitions) {
+            this.mBuffer = mBuffer;
+            this.feedManager = feedManager;
+            this.nPartitions = nPartitions;
+            this.sMessage = new ScaleInReportMessage(connectionId, FeedRuntimeType.COMPUTE, 0, 0);
+            this.proposedChange = false;
+        }
+
+        public int getNumberOfPartitions() {
+            return nPartitions;
+        }
+
+        public void setNumberOfPartitions(int nPartitions) {
+            this.nPartitions = nPartitions;
+            proposedChange = false;
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Reset the number of partitions for " + mBuffer.getRuntimeId() + " to " + nPartitions);
+            }
+        }
+
+        @Override
+        public void run() {
+            if (!proposedChange) {
+                int inflowRate = mBuffer.getInflowRate();
+                int procRate = mBuffer.getProcessingRate();
+                if (inflowRate > 0 && procRate > 0) {
+                    if (inflowRate < procRate) {
+                        int possibleCardinality = (int) Math.ceil(nPartitions * inflowRate / (double) procRate);
+                        if (possibleCardinality < nPartitions
+                                && ((((nPartitions - possibleCardinality) * 1.0) / nPartitions) >= 0.25)) {
+                            sMessage.reset(nPartitions, possibleCardinality);
+                            feedManager.getFeedMessageService().sendMessage(sMessage);
+                            proposedChange = true;
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Proposed scale-in " + sMessage);
+                            }
+                        }
+                    } else {
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Inflow Rate (" + inflowRate + ") exceeds Processing Rate" + " (" + procRate
+                                    + ")");
+                        }
+                    }
+                }
+            } else {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Waiting for earlier proposal to scale in to be applied");
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoad.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoad.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoad.java
new file mode 100644
index 0000000..66e5a40
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoad.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+public class NodeLoad implements Comparable<NodeLoad> {
+
+    private final String nodeId;
+
+    private int nRuntimes;
+
+    public NodeLoad(String nodeId) {
+        this.nodeId = nodeId;
+        this.nRuntimes = 0;
+    }
+
+    public void addLoad() {
+        nRuntimes++;
+    }
+
+    public void removeLoad(FeedRuntimeType runtimeType) {
+        nRuntimes--;
+    }
+
+    @Override
+    public int compareTo(NodeLoad o) {
+        if (this == o) {
+            return 0;
+        }
+        return nRuntimes - o.getnRuntimes();
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public int getnRuntimes() {
+        return nRuntimes;
+    }
+
+    public void setnRuntimes(int nRuntimes) {
+        this.nRuntimes = nRuntimes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReport.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReport.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReport.java
new file mode 100644
index 0000000..8257143
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReport.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class NodeLoadReport implements Comparable<NodeLoadReport> {
+
+    private final String nodeId;
+    private float cpuLoad;
+    private double usedHeap;
+    private int nRuntimes;
+
+    public NodeLoadReport(String nodeId, float cpuLoad, float usedHeap, int nRuntimes) {
+        this.nodeId = nodeId;
+        this.cpuLoad = cpuLoad;
+        this.usedHeap = usedHeap;
+        this.nRuntimes = nRuntimes;
+    }
+
+    public static NodeLoadReport read(JSONObject obj) throws JSONException {
+        NodeLoadReport r = new NodeLoadReport(obj.getString(FeedConstants.MessageConstants.NODE_ID),
+                (float) obj.getDouble(FeedConstants.MessageConstants.CPU_LOAD),
+                (float) obj.getDouble(FeedConstants.MessageConstants.HEAP_USAGE),
+                obj.getInt(FeedConstants.MessageConstants.N_RUNTIMES));
+        return r;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof NodeLoadReport)) {
+            return false;
+        }
+        return ((NodeLoadReport) o).nodeId.equals(nodeId);
+    }
+
+    @Override
+    public int hashCode() {
+        return nodeId.hashCode();
+    }
+
+    @Override
+    public int compareTo(NodeLoadReport o) {
+        if (nRuntimes != o.getnRuntimes()) {
+            return nRuntimes - o.getnRuntimes();
+        } else {
+            return (int) (this.cpuLoad - ((NodeLoadReport) o).cpuLoad);
+        }
+    }
+
+    public float getCpuLoad() {
+        return cpuLoad;
+    }
+
+    public void setCpuLoad(float cpuLoad) {
+        this.cpuLoad = cpuLoad;
+    }
+
+    public double getUsedHeap() {
+        return usedHeap;
+    }
+
+    public void setUsedHeap(double usedHeap) {
+        this.usedHeap = usedHeap;
+    }
+
+    public int getnRuntimes() {
+        return nRuntimes;
+    }
+
+    public void setnRuntimes(int nRuntimes) {
+        this.nRuntimes = nRuntimes;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+}