You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:06 UTC

[12/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
new file mode 100644
index 0000000..fc06ab9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.feed.watch;
+
+import java.util.Map;
+
+public class FeedActivity implements Comparable<FeedActivity> {
+
+    private int activityId;
+
+    private final String dataverseName;
+    private final String datasetName;
+    private final String feedName;
+    private final Map<String, String> feedActivityDetails;
+
+    public static class FeedActivityDetails {
+        public static final String INTAKE_LOCATIONS = "intake-locations";
+        public static final String COMPUTE_LOCATIONS = "compute-locations";
+        public static final String STORAGE_LOCATIONS = "storage-locations";
+        public static final String COLLECT_LOCATIONS = "collect-locations";
+        public static final String FEED_POLICY_NAME = "feed-policy-name";
+        public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp";
+    }
+
+    public FeedActivity(String dataverseName, String feedName, String datasetName,
+            Map<String, String> feedActivityDetails) {
+        this.dataverseName = dataverseName;
+        this.feedName = feedName;
+        this.datasetName = datasetName;
+        this.feedActivityDetails = feedActivityDetails;
+    }
+
+    public String getDataverseName() {
+        return dataverseName;
+    }
+
+    public String getDatasetName() {
+        return datasetName;
+    }
+
+    public String getFeedName() {
+        return feedName;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof FeedActivity)) {
+            return false;
+        }
+
+        if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
+            return false;
+        }
+        if (!((FeedActivity) other).datasetName.equals(datasetName)) {
+            return false;
+        }
+        if (!((FeedActivity) other).getFeedName().equals(feedName)) {
+            return false;
+        }
+        if (((FeedActivity) other).getActivityId() != (activityId)) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return dataverseName + "." + feedName + " --> " + datasetName + " " + activityId;
+    }
+
+    public String getConnectTimestamp() {
+        return feedActivityDetails.get(FeedActivityDetails.FEED_CONNECT_TIMESTAMP);
+    }
+
+    public int getActivityId() {
+        return activityId;
+    }
+
+    public void setActivityId(int activityId) {
+        this.activityId = activityId;
+    }
+
+    public Map<String, String> getFeedActivityDetails() {
+        return feedActivityDetails;
+    }
+
+    @Override
+    public int compareTo(FeedActivity o) {
+        return o.getActivityId() - this.activityId;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
new file mode 100644
index 0000000..3e42169
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedConnectJobInfo extends FeedJobInfo {
+
+    private final FeedConnectionId connectionId;
+    private final Map<String, String> feedPolicy;
+    private final IFeedJoint sourceFeedJoint;
+    private IFeedJoint computeFeedJoint;
+
+    private List<String> collectLocations;
+    private List<String> computeLocations;
+    private List<String> storageLocations;
+
+    public FeedConnectJobInfo(JobId jobId, FeedJobState state, FeedConnectionId connectionId,
+            IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
+            Map<String, String> feedPolicy) {
+        super(jobId, state, FeedJobInfo.JobType.FEED_CONNECT, spec);
+        this.connectionId = connectionId;
+        this.sourceFeedJoint = sourceFeedJoint;
+        this.computeFeedJoint = computeFeedJoint;
+        this.feedPolicy = feedPolicy;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public List<String> getCollectLocations() {
+        return collectLocations;
+    }
+
+    public List<String> getComputeLocations() {
+        return computeLocations;
+    }
+
+    public List<String> getStorageLocations() {
+        return storageLocations;
+    }
+
+    public void setCollectLocations(List<String> collectLocations) {
+        this.collectLocations = collectLocations;
+    }
+
+    public void setComputeLocations(List<String> computeLocations) {
+        this.computeLocations = computeLocations;
+    }
+
+    public void setStorageLocations(List<String> storageLocations) {
+        this.storageLocations = storageLocations;
+    }
+
+    public IFeedJoint getSourceFeedJoint() {
+        return sourceFeedJoint;
+    }
+
+    public IFeedJoint getComputeFeedJoint() {
+        return computeFeedJoint;
+    }
+
+    public Map<String, String> getFeedPolicy() {
+        return feedPolicy;
+    }
+
+    public void setComputeFeedJoint(IFeedJoint computeFeedJoint) {
+        this.computeFeedJoint = computeFeedJoint;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
new file mode 100644
index 0000000..3b11811
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedIntakeInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.List;
+
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedIntakeInfo extends FeedJobInfo {
+
+    private final FeedId feedId;
+    private final IFeedJoint intakeFeedJoint;
+    private final JobSpecification spec;
+    private List<String> intakeLocation;
+
+    public FeedIntakeInfo(JobId jobId, FeedJobState state, JobType jobType, FeedId feedId, IFeedJoint intakeFeedJoint,
+            JobSpecification spec) {
+        super(jobId, state, FeedJobInfo.JobType.INTAKE, spec);
+        this.feedId = feedId;
+        this.intakeFeedJoint = intakeFeedJoint;
+        this.spec = spec;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    public IFeedJoint getIntakeFeedJoint() {
+        return intakeFeedJoint;
+    }
+
+    public JobSpecification getSpec() {
+        return spec;
+    }
+
+    public List<String> getIntakeLocation() {
+        return intakeLocation;
+    }
+
+    public void setIntakeLocation(List<String> intakeLocation) {
+        this.intakeLocation = intakeLocation;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
new file mode 100644
index 0000000..92e00cb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedJobInfo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedJobInfo {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedJobInfo.class.getName());
+
+    public enum JobType {
+        INTAKE,
+        FEED_CONNECT
+    }
+
+    public enum FeedJobState {
+        CREATED,
+        ACTIVE,
+        UNDER_RECOVERY,
+        ENDED
+    }
+
+    protected final JobId jobId;
+    protected final JobType jobType;
+    protected FeedJobState state;
+    protected JobSpecification spec;
+
+    public FeedJobInfo(JobId jobId, FeedJobState state, JobType jobType, JobSpecification spec) {
+        this.jobId = jobId;
+        this.state = state;
+        this.jobType = jobType;
+        this.spec = spec;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public FeedJobState getState() {
+        return state;
+    }
+
+    public void setState(FeedJobState state) {
+        this.state = state;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(this + " is in " + state + " state.");
+        }
+    }
+
+    public JobType getJobType() {
+        return jobType;
+    }
+
+    public JobSpecification getSpec() {
+        return spec;
+    }
+
+    public void setSpec(JobSpecification spec) {
+        this.spec = spec;
+    }
+
+    public String toString() {
+        return jobId + " [" + jobType + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedMetricCollector.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedMetricCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedMetricCollector.java
new file mode 100644
index 0000000..f0db639
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedMetricCollector.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+
+public class FeedMetricCollector implements IFeedMetricCollector {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMetricCollector.class.getName());
+
+    private static final int UNKNOWN = -1;
+
+    private final AtomicInteger globalSenderId = new AtomicInteger(1);
+    private final Map<Integer, Sender> senders = new HashMap<Integer, Sender>();
+    private final Map<Integer, Series> statHistory = new HashMap<Integer, Series>();
+    private final Map<String, Sender> sendersByName = new HashMap<String, Sender>();
+
+    public FeedMetricCollector(String nodeId) {
+    }
+
+    @Override
+    public synchronized int createReportSender(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            ValueType valueType, MetricType metricType) {
+        Sender sender = new Sender(globalSenderId.getAndIncrement(), connectionId, runtimeId, valueType, metricType);
+        senders.put(sender.senderId, sender);
+        sendersByName.put(sender.getDisplayName(), sender);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Sender id " + sender.getSenderId() + " created for " + sender);
+        }
+        return sender.senderId;
+    }
+
+    @Override
+    public void removeReportSender(int senderId) {
+        Sender sender = senders.get(senderId);
+        if (sender != null) {
+            statHistory.remove(senderId);
+            senders.remove(senderId);
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to remove sender Id");
+            }
+            throw new IllegalStateException("Unable to remove sender Id " + senderId + " senders " + senders);
+        }
+    }
+
+    @Override
+    public boolean sendReport(int senderId, int value) {
+        Sender sender = senders.get(senderId);
+        if (sender != null) {
+            Series series = statHistory.get(sender.senderId);
+            if (series == null) {
+                switch (sender.mType) {
+                    case AVG:
+                        series = new SeriesAvg();
+                        break;
+                    case RATE:
+                        series = new SeriesRate();
+                        break;
+                }
+                statHistory.put(sender.senderId, series);
+            }
+            series.addValue(value);
+            return true;
+        }
+        throw new IllegalStateException("Unable to send report sender Id " + senderId + " senders " + senders);
+    }
+
+    @Override
+    public void resetReportSender(int senderId) {
+        Sender sender = senders.get(senderId);
+        if (sender != null) {
+            Series series = statHistory.get(sender.senderId);
+            if (series != null) {
+                series.reset();
+            }
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Sender with id " + senderId + " not found. Unable to reset!");
+            }
+            throw new IllegalStateException("Unable to reset sender Id " + senderId + " senders " + senders);
+        }
+    }
+
+    private static class Sender {
+
+        private final int senderId;
+        private final MetricType mType;
+        private final String displayName;
+
+        public Sender(int senderId, FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType,
+                MetricType mType) {
+            this.senderId = senderId;
+            this.mType = mType;
+            this.displayName = createDisplayName(connectionId, runtimeId, valueType);
+        }
+
+        @Override
+        public String toString() {
+            return displayName + "[" + senderId + "]" + "(" + mType + ")";
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof Sender)) {
+                return false;
+            }
+            return ((Sender) o).senderId == senderId;
+        }
+
+        @Override
+        public int hashCode() {
+            return senderId;
+        }
+
+        public static String createDisplayName(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+                ValueType valueType) {
+            return connectionId + " (" + runtimeId.getFeedRuntimeType() + " )" + "[" + runtimeId.getPartition() + "]"
+                    + "{" + valueType + "}";
+        }
+
+        public String getDisplayName() {
+            return displayName;
+        }
+
+        public int getSenderId() {
+            return senderId;
+        }
+    }
+
+    @Override
+    public int getMetric(int senderId) {
+        Sender sender = senders.get(senderId);
+        return getMetric(sender);
+    }
+
+    @Override
+    public int getMetric(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType) {
+        String displayName = Sender.createDisplayName(connectionId, runtimeId, valueType);
+        Sender sender = sendersByName.get(displayName);
+        return getMetric(sender);
+    }
+
+    private int getMetric(Sender sender) {
+        if (sender == null || statHistory.get(sender.getSenderId()) == null) {
+            return UNKNOWN;
+        }
+
+        float result = -1;
+        Series series = statHistory.get(sender.getSenderId());
+        switch (sender.mType) {
+            case AVG:
+                result = ((SeriesAvg) series).getAvg();
+                break;
+            case RATE:
+                result = ((SeriesRate) series).getRate();
+                break;
+        }
+        return (int) result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakePartitionStatistics.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakePartitionStatistics.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakePartitionStatistics.java
new file mode 100644
index 0000000..acfd1fb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakePartitionStatistics.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.BitSet;
+
+public class IntakePartitionStatistics {
+
+    public static int ACK_WINDOW_SIZE = 1024;
+    private BitSet bitSet;
+
+    public IntakePartitionStatistics(int partition, int base) {
+        this.bitSet = new BitSet(ACK_WINDOW_SIZE);
+    }
+
+    public void ackRecordId(int recordId) {
+        int posIndexWithinBase = recordId % ACK_WINDOW_SIZE;
+        this.bitSet.set(posIndexWithinBase);
+    }
+
+    public byte[] getAckInfo() {
+        return bitSet.toByteArray();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakeSideMonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakeSideMonitoredBuffer.java
new file mode 100644
index 0000000..7a79e23
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/IntakeSideMonitoredBuffer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFramePostProcessor;
+import org.apache.asterix.external.feed.api.IFramePreprocessor;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class IntakeSideMonitoredBuffer extends MonitoredBuffer {
+
+    public IntakeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+        super(ctx, inputHandler, frameWriter, fta,  recordDesc, metricCollector, connectionId, runtimeId,
+                exceptionHandler, callback, nPartitions, policyAccessor);
+    }
+
+    @Override
+    protected boolean monitorProcessingRate() {
+        return false;
+    }
+
+    @Override
+    protected boolean logInflowOutflowRate() {
+        return false;
+    }
+
+    @Override
+    protected IFramePreprocessor getFramePreProcessor() {
+        return null;
+    }
+
+    @Override
+    protected IFramePostProcessor getFramePostProcessor() {
+        return null;
+    }
+
+    @Override
+    protected boolean monitorInputQueueLength() {
+        return false;
+    }
+
+    @Override
+    protected boolean reportOutflowRate() {
+        return false;
+    }
+
+    @Override
+    protected boolean reportInflowRate() {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
new file mode 100644
index 0000000..db38edf
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.logging.Level;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFramePostProcessor;
+import org.apache.asterix.external.feed.api.IFramePreprocessor;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.ValueType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.IFrameEventCallback.FrameEvent;
+import org.apache.asterix.external.feed.dataflow.DataBucket;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.message.MessageReceiver;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.LogInputOutputRateTask;
+import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitorInputQueueLengthTimerTask;
+import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitoreProcessRateTimerTask;
+import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
+import org.apache.asterix.external.util.FeedFrameUtil;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public abstract class MonitoredBuffer extends MessageReceiver<DataBucket> {
+
+    protected static final long LOG_INPUT_OUTPUT_RATE_FREQUENCY = 5000; // 5 seconds
+    protected static final long INPUT_QUEUE_MEASURE_FREQUENCY = 1000; // 1 second
+    protected static final long PROCESSING_RATE_MEASURE_FREQUENCY = 10000; // 10 seconds
+
+    protected static final int PROCESS_RATE_REFRESH = 2; // refresh processing rate every 10th frame
+
+    protected final IHyracksTaskContext ctx;
+    protected final FeedConnectionId connectionId;
+    protected final FeedRuntimeId runtimeId;
+    protected final FrameTupleAccessor inflowFta;
+    protected final FrameTupleAccessor outflowFta;
+    protected final FeedRuntimeInputHandler inputHandler;
+    protected final IFrameEventCallback callback;
+    protected final Timer timer;
+    private final IExceptionHandler exceptionHandler;
+    protected final FeedPolicyAccessor policyAccessor;
+    protected int nPartitions;
+
+    private IFrameWriter frameWriter;
+    protected IFeedMetricCollector metricCollector;
+    protected boolean monitorProcessingRate = false;
+    protected boolean monitorInputQueueLength = false;
+    protected boolean logInflowOutflowRate = false;
+    protected boolean reportOutflowRate = false;
+    protected boolean reportInflowRate = false;
+
+    protected int inflowReportSenderId = -1;
+    protected int outflowReportSenderId = -1;
+    protected TimerTask monitorInputQueueLengthTask;
+    protected TimerTask processingRateTask;
+    protected TimerTask logInflowOutflowRateTask;
+    protected MonitoredBufferStorageTimerTask storageTimeTrackingRateTask;
+    protected StorageFrameHandler storageFromeHandler;
+
+    protected int processingRate = -1;
+    protected int frameCount = 0;
+    private long avgDelayPersistence = 0;
+    private boolean active;
+    private Map<Integer, Long> tupleTimeStats;
+    IFramePostProcessor postProcessor = null;
+    IFramePreprocessor preProcessor = null;
+
+    public static MonitoredBuffer getMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
+            IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+            IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
+            FeedPolicyAccessor policyAccessor) {
+        switch (runtimeId.getFeedRuntimeType()) {
+            case COMPUTE:
+                return new ComputeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+            case STORE:
+                return new StorageSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+            case COLLECT:
+                return new IntakeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+            default:
+                return new BasicMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+        }
+    }
+
+    protected MonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+        this.ctx = ctx;
+        this.connectionId = connectionId;
+        this.frameWriter = frameWriter;
+        this.inflowFta = new FrameTupleAccessor(recordDesc);
+        this.outflowFta = new FrameTupleAccessor(recordDesc);
+        this.runtimeId = runtimeId;
+        this.metricCollector = metricCollector;
+        this.exceptionHandler = exceptionHandler;
+        this.callback = callback;
+        this.inputHandler = inputHandler;
+        this.timer = new Timer();
+        this.policyAccessor = policyAccessor;
+        this.nPartitions = nPartitions;
+        this.active = true;
+        initializeMonitoring();
+    }
+
+    protected abstract boolean monitorProcessingRate();
+
+    protected abstract boolean logInflowOutflowRate();
+
+    protected abstract boolean reportOutflowRate();
+
+    protected abstract boolean reportInflowRate();
+
+    protected abstract boolean monitorInputQueueLength();
+
+    protected abstract IFramePreprocessor getFramePreProcessor();
+
+    protected abstract IFramePostProcessor getFramePostProcessor();
+
+    protected void initializeMonitoring() {
+        monitorProcessingRate = monitorProcessingRate();
+        monitorInputQueueLength = monitorInputQueueLength();
+        reportInflowRate = reportInflowRate();
+        reportOutflowRate = reportOutflowRate();
+        logInflowOutflowRate = policyAccessor.isLoggingStatisticsEnabled() || logInflowOutflowRate();
+
+        if (monitorProcessingRate && policyAccessor.isElastic()) { // check possibility to scale in
+            this.processingRateTask = new MonitoreProcessRateTimerTask(this, inputHandler.getFeedManager(),
+                    connectionId, nPartitions);
+            this.timer.scheduleAtFixedRate(processingRateTask, 0, PROCESSING_RATE_MEASURE_FREQUENCY);
+        }
+
+        if (monitorInputQueueLength && (policyAccessor.isElastic() || policyAccessor.throttlingEnabled()
+                || policyAccessor.spillToDiskOnCongestion() || policyAccessor.discardOnCongestion())) {
+            this.monitorInputQueueLengthTask = new MonitorInputQueueLengthTimerTask(this, callback);
+            this.timer.scheduleAtFixedRate(monitorInputQueueLengthTask, 0, INPUT_QUEUE_MEASURE_FREQUENCY);
+        }
+
+        if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
+            this.logInflowOutflowRateTask = new LogInputOutputRateTask(this, logInflowOutflowRate, reportInflowRate,
+                    reportOutflowRate);
+            this.timer.scheduleAtFixedRate(logInflowOutflowRateTask, 0, LOG_INPUT_OUTPUT_RATE_FREQUENCY);
+            this.inflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
+                    ValueType.INFLOW_RATE, MetricType.RATE);
+            this.outflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
+                    ValueType.OUTFLOW_RATE, MetricType.RATE);
+        }
+    }
+
+    protected void deinitializeMonitoring() {
+        if (monitorInputQueueLengthTask != null) {
+            monitorInputQueueLengthTask.cancel();
+        }
+        if (processingRateTask != null) {
+            processingRateTask.cancel();
+        }
+        if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
+            metricCollector.removeReportSender(inflowReportSenderId);
+            metricCollector.removeReportSender(outflowReportSenderId);
+            logInflowOutflowRateTask.cancel();
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Disabled monitoring for " + this.runtimeId);
+        }
+    }
+
+    protected void postProcessFrame(long startTime, ByteBuffer frame) throws Exception {
+        if (monitorProcessingRate) {
+            frameCount++;
+            if (frameCount % PROCESS_RATE_REFRESH == 0) {
+                long endTime = System.currentTimeMillis();
+                processingRate = (int) ((double) outflowFta.getTupleCount() * 1000 / (endTime - startTime));
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Processing Rate :" + processingRate + " tuples/sec");
+                }
+                frameCount = 0;
+            }
+        }
+
+        if (logInflowOutflowRate || reportOutflowRate) {
+            metricCollector.sendReport(outflowReportSenderId, outflowFta.getTupleCount());
+        }
+
+        postProcessFrame(frame);
+
+    }
+
+    protected void preProcessFrame(ByteBuffer frame) throws Exception {
+        if (preProcessor == null) {
+            preProcessor = getFramePreProcessor();
+        }
+        if (preProcessor != null) {
+            preProcessor.preProcess(frame);
+        }
+    }
+
+    protected void postProcessFrame(ByteBuffer frame) throws Exception {
+        if (postProcessor == null) {
+            postProcessor = getFramePostProcessor();
+        }
+        if (postProcessor != null) {
+            outflowFta.reset(frame);
+            postProcessor.postProcessFrame(frame, outflowFta);
+        }
+    }
+
+    @Override
+    public void sendMessage(DataBucket message) {
+        inbox.add(message);
+    }
+
+    public void sendReport(ByteBuffer frame) {
+        if ((logInflowOutflowRate || reportInflowRate) && !(inputHandler.getMode().equals(Mode.PROCESS_BACKLOG)
+                || inputHandler.getMode().equals(Mode.PROCESS_SPILL))) {
+            inflowFta.reset(frame);
+            metricCollector.sendReport(inflowReportSenderId, inflowFta.getTupleCount());
+        }
+    }
+
+    /** return rate in terms of tuples/sec **/
+    public int getInflowRate() {
+        return metricCollector.getMetric(inflowReportSenderId);
+    }
+
+    /** return rate in terms of tuples/sec **/
+    public int getOutflowRate() {
+        return metricCollector.getMetric(outflowReportSenderId);
+    }
+
+    /** return the number of pending frames from the input queue **/
+    public int getWorkSize() {
+        return inbox.size();
+    }
+
+    /** reset the number of partitions (cardinality) for the runtime **/
+    public void setNumberOfPartitions(int nPartitions) {
+        if (processingRateTask != null) {
+            int currentPartitions = ((MonitoreProcessRateTimerTask) processingRateTask).getNumberOfPartitions();
+            if (currentPartitions != nPartitions) {
+                ((MonitoreProcessRateTimerTask) processingRateTask).setNumberOfPartitions(nPartitions);
+            }
+        }
+    }
+
+    public FeedRuntimeInputHandler getInputHandler() {
+        return inputHandler;
+    }
+
+    public synchronized void close(boolean processPending, boolean disableMonitoring) {
+        super.close(processPending);
+        if (disableMonitoring) {
+            deinitializeMonitoring();
+        }
+        active = false;
+    }
+
+    @Override
+    public synchronized void processMessage(DataBucket message) throws Exception {
+        if (!active) {
+            message.doneReading();
+            return;
+        }
+        switch (message.getContentType()) {
+            case DATA:
+                boolean finishedProcessing = false;
+                ByteBuffer frameReceived = message.getContent();
+                ByteBuffer frameToProcess = null;
+                if (inputHandler.isThrottlingEnabled()) {
+                    inflowFta.reset(frameReceived);
+                    int pRate = getProcessingRate();
+                    int inflowRate = getInflowRate();
+                    if (inflowRate > pRate) {
+                        double retainFraction = (pRate * 0.8 / inflowRate);
+                        frameToProcess = throttleFrame(inflowFta, retainFraction);
+                        inflowFta.reset(frameToProcess);
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Throttling at fraction " + retainFraction + "inflow rate " + inflowRate
+                                    + " no of tuples remaining " + inflowFta.getTupleCount());
+
+                        }
+                    } else {
+                        frameToProcess = frameReceived;
+                    }
+                } else {
+                    frameToProcess = frameReceived;
+                }
+                outflowFta.reset(frameToProcess);
+                long startTime = 0;
+                while (!finishedProcessing) {
+                    try {
+                        inflowFta.reset(frameToProcess);
+                        startTime = System.currentTimeMillis();
+                        preProcessFrame(frameToProcess);
+                        frameWriter.nextFrame(frameToProcess);
+                        postProcessFrame(startTime, frameToProcess);
+                        finishedProcessing = true;
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        frameToProcess = exceptionHandler.handleException(e, frameToProcess);
+                        finishedProcessing = true;
+                    }
+                }
+                message.doneReading();
+                break;
+            case EOD:
+                message.doneReading();
+                timer.cancel();
+                callback.frameEvent(FrameEvent.FINISHED_PROCESSING);
+                break;
+            case EOSD:
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Done processing spillage");
+                }
+                message.doneReading();
+                callback.frameEvent(FrameEvent.FINISHED_PROCESSING_SPILLAGE);
+                break;
+
+        }
+    }
+
+    private ByteBuffer throttleFrame(FrameTupleAccessor fta, double retainFraction) throws HyracksDataException {
+        int desiredTuples = (int) (fta.getTupleCount() * retainFraction);
+        return FeedFrameUtil.getSampledFrame(ctx, fta, desiredTuples);
+    }
+
+    public Mode getMode() {
+        return inputHandler.getMode();
+    }
+
+    public FeedRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
+    public void setFrameWriter(IFrameWriter frameWriter) {
+        this.frameWriter = frameWriter;
+    }
+
+    public void reset() {
+        active = true;
+        if (logInflowOutflowRate) {
+            metricCollector.resetReportSender(inflowReportSenderId);
+            metricCollector.resetReportSender(outflowReportSenderId);
+        }
+    }
+
+    public int getProcessingRate() {
+        return processingRate;
+    }
+
+    public Map<Integer, Long> getTupleTimeStats() {
+        return tupleTimeStats;
+    }
+
+    public long getAvgDelayRecordPersistence() {
+        return avgDelayPersistence;
+    }
+
+    public MonitoredBufferStorageTimerTask getStorageTimeTrackingRateTask() {
+        return storageTimeTrackingRateTask;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
new file mode 100644
index 0000000..86c6bca
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedMessageService;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.ValueType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.IFrameEventCallback.FrameEvent;
+import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.message.FeedReportMessage;
+import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
+import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
+import org.apache.asterix.external.feed.message.ScaleInReportMessage;
+import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+
+public class MonitoredBufferTimerTasks {
+
+    private static final Logger LOGGER = Logger.getLogger(MonitorInputQueueLengthTimerTask.class.getName());
+
+    public static class MonitoredBufferStorageTimerTask extends TimerTask {
+
+        private static final int PERSISTENCE_DELAY_VIOLATION_MAX = 5;
+
+        private final StorageSideMonitoredBuffer mBuffer;
+        private final IFeedManager feedManager;
+        private final int partition;
+        private final FeedConnectionId connectionId;
+        private final FeedPolicyAccessor policyAccessor;
+        private final StorageFrameHandler storageFromeHandler;
+        private final StorageReportFeedMessage storageReportMessage;
+        private final FeedTupleCommitAckMessage tupleCommitAckMessage;
+
+        private Map<Integer, Integer> maxIntakeBaseCovered;
+        private int countDelayExceeded = 0;
+
+        public MonitoredBufferStorageTimerTask(StorageSideMonitoredBuffer mBuffer, IFeedManager feedManager,
+                FeedConnectionId connectionId, int partition, FeedPolicyAccessor policyAccessor,
+                StorageFrameHandler storageFromeHandler) {
+            this.mBuffer = mBuffer;
+            this.feedManager = feedManager;
+            this.connectionId = connectionId;
+            this.partition = partition;
+            this.policyAccessor = policyAccessor;
+            this.storageFromeHandler = storageFromeHandler;
+            this.storageReportMessage = new StorageReportFeedMessage(this.connectionId, this.partition, 0, false, 0, 0);
+            this.tupleCommitAckMessage = new FeedTupleCommitAckMessage(this.connectionId, 0, 0, null);
+            this.maxIntakeBaseCovered = new HashMap<Integer, Integer>();
+        }
+
+        @Override
+        public void run() {
+            if (mBuffer.isAckingEnabled() && !mBuffer.getInputHandler().isThrottlingEnabled()) {
+                ackRecords();
+            }
+            if (mBuffer.isTimeTrackingEnabled()) {
+                checkLatencyViolation();
+            }
+        }
+
+        private void ackRecords() {
+            Set<Integer> partitions = storageFromeHandler.getPartitionsWithStats();
+            List<Integer> basesCovered = new ArrayList<Integer>();
+            for (int intakePartition : partitions) {
+                Map<Integer, IntakePartitionStatistics> baseAcks = storageFromeHandler
+                        .getBaseAcksForPartition(intakePartition);
+                for (Entry<Integer, IntakePartitionStatistics> entry : baseAcks.entrySet()) {
+                    int base = entry.getKey();
+                    IntakePartitionStatistics stats = entry.getValue();
+                    Integer maxIntakeBaseForPartition = maxIntakeBaseCovered.get(intakePartition);
+                    if (maxIntakeBaseForPartition == null || maxIntakeBaseForPartition < base) {
+                        tupleCommitAckMessage.reset(intakePartition, base, stats.getAckInfo());
+                        feedManager.getFeedMessageService().sendMessage(tupleCommitAckMessage);
+                    } else {
+                        basesCovered.add(base);
+                    }
+                }
+                for (Integer b : basesCovered) {
+                    baseAcks.remove(b);
+                }
+                basesCovered.clear();
+            }
+        }
+
+        private void checkLatencyViolation() {
+            long avgDelayPersistence = storageFromeHandler.getAvgDelayPersistence();
+            if (avgDelayPersistence > policyAccessor.getMaxDelayRecordPersistence()) {
+                countDelayExceeded++;
+                if (countDelayExceeded > PERSISTENCE_DELAY_VIOLATION_MAX) {
+                    storageReportMessage.reset(0, false, mBuffer.getAvgDelayRecordPersistence());
+                    feedManager.getFeedMessageService().sendMessage(storageReportMessage);
+                }
+            } else {
+                countDelayExceeded = 0;
+            }
+        }
+
+        public void receiveCommitAckResponse(FeedTupleCommitResponseMessage message) {
+            maxIntakeBaseCovered.put(message.getIntakePartition(), message.getMaxWindowAcked());
+        }
+    }
+
+    public static class LogInputOutputRateTask extends TimerTask {
+
+        private final MonitoredBuffer mBuffer;
+        private final boolean log;
+        private final boolean reportInflow;
+        private final boolean reportOutflow;
+
+        private final IFeedMessageService messageService;
+        private final FeedReportMessage message;
+
+        public LogInputOutputRateTask(MonitoredBuffer mBuffer, boolean log, boolean reportInflow, boolean reportOutflow) {
+            this.mBuffer = mBuffer;
+            this.log = log;
+            this.reportInflow = reportInflow;
+            this.reportOutflow = reportOutflow;
+            if (reportInflow || reportOutflow) {
+                ValueType vType = reportInflow ? ValueType.INFLOW_RATE : ValueType.OUTFLOW_RATE;
+                messageService = mBuffer.getInputHandler().getFeedManager().getFeedMessageService();
+                message = new FeedReportMessage(mBuffer.getInputHandler().getConnectionId(), mBuffer.getRuntimeId(),
+                        vType, 0);
+            } else {
+                messageService = null;
+                message = null;
+            }
+
+        }
+
+        @Override
+        public void run() {
+            int pendingWork = mBuffer.getWorkSize();
+            int outflowRate = mBuffer.getOutflowRate();
+            int inflowRate = mBuffer.getInflowRate();
+            if (log) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info(mBuffer.getRuntimeId() + " " + "Inflow rate:" + inflowRate + " Outflow Rate:"
+                            + outflowRate + " Pending Work " + pendingWork);
+                }
+            }
+            if (reportInflow) {
+                message.reset(inflowRate);
+            } else if (reportOutflow) {
+                message.reset(outflowRate);
+            }
+            messageService.sendMessage(message);
+        }
+    }
+
+    public static class MonitorInputQueueLengthTimerTask extends TimerTask {
+
+        private final MonitoredBuffer mBuffer;
+        private final IFrameEventCallback callback;
+        private final int pendingWorkThreshold;
+        private final int maxSuccessiveThresholdPeriods;
+        private FrameEvent lastEvent = FrameEvent.NO_OP;
+        private int pendingWorkExceedCount = 0;
+
+        public MonitorInputQueueLengthTimerTask(MonitoredBuffer mBuffer, IFrameEventCallback callback) {
+            this.mBuffer = mBuffer;
+            this.callback = callback;
+            AsterixFeedProperties props = mBuffer.getInputHandler().getFeedManager().getAsterixFeedProperties();
+            pendingWorkThreshold = props.getPendingWorkThreshold();
+            maxSuccessiveThresholdPeriods = props.getMaxSuccessiveThresholdPeriod();
+        }
+
+        @Override
+        public void run() {
+            int pendingWork = mBuffer.getWorkSize();
+            if (mBuffer.getMode().equals(Mode.PROCESS_SPILL) || mBuffer.getMode().equals(Mode.PROCESS_BACKLOG)) {
+                return;
+            }
+
+            switch (lastEvent) {
+                case NO_OP:
+                case PENDING_WORK_DONE:
+                case FINISHED_PROCESSING_SPILLAGE:
+                    if (pendingWork > pendingWorkThreshold) {
+                        pendingWorkExceedCount++;
+                        if (pendingWorkExceedCount > maxSuccessiveThresholdPeriods) {
+                            pendingWorkExceedCount = 0;
+                            lastEvent = FrameEvent.PENDING_WORK_THRESHOLD_REACHED;
+                            callback.frameEvent(lastEvent);
+                        }
+                    } else if (pendingWork == 0 && mBuffer.getMode().equals(Mode.SPILL)) {
+                        lastEvent = FrameEvent.PENDING_WORK_DONE;
+                        callback.frameEvent(lastEvent);
+                    }
+                    break;
+                case PENDING_WORK_THRESHOLD_REACHED:
+                    if (((pendingWork * 1.0) / pendingWorkThreshold) <= 0.5) {
+                        lastEvent = FrameEvent.PENDING_WORK_DONE;
+                        callback.frameEvent(lastEvent);
+                    }
+                    break;
+                case FINISHED_PROCESSING:
+                    break;
+
+            }
+        }
+    }
+
+    /**
+     * A timer task to measure and compare the processing rate and inflow rate
+     * to look for possibility to scale-in, that is reduce the degree of cardinality
+     * of the compute operator.
+     */
+    public static class MonitoreProcessRateTimerTask extends TimerTask {
+
+        private final MonitoredBuffer mBuffer;
+        private final IFeedManager feedManager;
+        private int nPartitions;
+        private ScaleInReportMessage sMessage;
+        private boolean proposedChange;
+
+        public MonitoreProcessRateTimerTask(MonitoredBuffer mBuffer, IFeedManager feedManager,
+                FeedConnectionId connectionId, int nPartitions) {
+            this.mBuffer = mBuffer;
+            this.feedManager = feedManager;
+            this.nPartitions = nPartitions;
+            this.sMessage = new ScaleInReportMessage(connectionId, FeedRuntimeType.COMPUTE, 0, 0);
+            this.proposedChange = false;
+        }
+
+        public int getNumberOfPartitions() {
+            return nPartitions;
+        }
+
+        public void setNumberOfPartitions(int nPartitions) {
+            this.nPartitions = nPartitions;
+            proposedChange = false;
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Reset the number of partitions for " + mBuffer.getRuntimeId() + " to " + nPartitions);
+            }
+        }
+
+        @Override
+        public void run() {
+            if (!proposedChange) {
+                int inflowRate = mBuffer.getInflowRate();
+                int procRate = mBuffer.getProcessingRate();
+                if (inflowRate > 0 && procRate > 0) {
+                    if (inflowRate < procRate) {
+                        int possibleCardinality = (int) Math.ceil(nPartitions * inflowRate / (double) procRate);
+                        if (possibleCardinality < nPartitions
+                                && ((((nPartitions - possibleCardinality) * 1.0) / nPartitions) >= 0.25)) {
+                            sMessage.reset(nPartitions, possibleCardinality);
+                            feedManager.getFeedMessageService().sendMessage(sMessage);
+                            proposedChange = true;
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Proposed scale-in " + sMessage);
+                            }
+                        }
+                    } else {
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Inflow Rate (" + inflowRate + ") exceeds Processing Rate" + " (" + procRate
+                                    + ")");
+                        }
+                    }
+                }
+            } else {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Waiting for earlier proposal to scale in to be applied");
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
new file mode 100644
index 0000000..d3919b5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+
+public class NodeLoad implements Comparable<NodeLoad> {
+
+    private final String nodeId;
+
+    private int nRuntimes;
+
+    public NodeLoad(String nodeId) {
+        this.nodeId = nodeId;
+        this.nRuntimes = 0;
+    }
+
+    public void addLoad() {
+        nRuntimes++;
+    }
+
+    public void removeLoad(FeedRuntimeType runtimeType) {
+        nRuntimes--;
+    }
+
+    @Override
+    public int compareTo(NodeLoad o) {
+        if (this == o) {
+            return 0;
+        }
+        return nRuntimes - o.getnRuntimes();
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public int getnRuntimes() {
+        return nRuntimes;
+    }
+
+    public void setnRuntimes(int nRuntimes) {
+        this.nRuntimes = nRuntimes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
new file mode 100644
index 0000000..bfddcf6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.util.FeedConstants;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class NodeLoadReport implements Comparable<NodeLoadReport> {
+
+    private final String nodeId;
+    private float cpuLoad;
+    private double usedHeap;
+    private int nRuntimes;
+
+    public NodeLoadReport(String nodeId, float cpuLoad, float usedHeap, int nRuntimes) {
+        this.nodeId = nodeId;
+        this.cpuLoad = cpuLoad;
+        this.usedHeap = usedHeap;
+        this.nRuntimes = nRuntimes;
+    }
+
+    public static NodeLoadReport read(JSONObject obj) throws JSONException {
+        NodeLoadReport r = new NodeLoadReport(obj.getString(FeedConstants.MessageConstants.NODE_ID),
+                (float) obj.getDouble(FeedConstants.MessageConstants.CPU_LOAD),
+                (float) obj.getDouble(FeedConstants.MessageConstants.HEAP_USAGE),
+                obj.getInt(FeedConstants.MessageConstants.N_RUNTIMES));
+        return r;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof NodeLoadReport)) {
+            return false;
+        }
+        return ((NodeLoadReport) o).nodeId.equals(nodeId);
+    }
+
+    @Override
+    public int hashCode() {
+        return nodeId.hashCode();
+    }
+
+    @Override
+    public int compareTo(NodeLoadReport o) {
+        if (nRuntimes != o.getnRuntimes()) {
+            return nRuntimes - o.getnRuntimes();
+        } else {
+            return (int) (this.cpuLoad - ((NodeLoadReport) o).cpuLoad);
+        }
+    }
+
+    public float getCpuLoad() {
+        return cpuLoad;
+    }
+
+    public void setCpuLoad(float cpuLoad) {
+        this.cpuLoad = cpuLoad;
+    }
+
+    public double getUsedHeap() {
+        return usedHeap;
+    }
+
+    public void setUsedHeap(double usedHeap) {
+        this.usedHeap = usedHeap;
+    }
+
+    public int getnRuntimes() {
+        return nRuntimes;
+    }
+
+    public void setnRuntimes(int nRuntimes) {
+        this.nRuntimes = nRuntimes;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
new file mode 100644
index 0000000..f651935
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.OperatingSystemMXBean;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedMessageService;
+import org.apache.asterix.external.feed.api.IFeedService;
+import org.apache.asterix.external.feed.message.NodeReportMessage;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+
+public class NodeLoadReportService implements IFeedService {
+
+    private static final int NODE_LOAD_REPORT_FREQUENCY = 2000;
+    private static final float CPU_CHANGE_THRESHOLD = 0.2f;
+    private static final float HEAP_CHANGE_THRESHOLD = 0.4f;
+
+    private final NodeLoadReportTask task;
+    private final Timer timer;
+
+    public NodeLoadReportService(String nodeId, IFeedManager feedManager) {
+        this.task = new NodeLoadReportTask(nodeId, feedManager);
+        this.timer = new Timer();
+    }
+
+    @Override
+    public void start() throws Exception {
+        timer.schedule(task, 0, NODE_LOAD_REPORT_FREQUENCY);
+    }
+
+    @Override
+    public void stop() {
+        timer.cancel();
+    }
+
+    private static class NodeLoadReportTask extends TimerTask {
+
+        private final IFeedManager feedManager;
+        private final NodeReportMessage message;
+        private final IFeedMessageService messageService;
+
+        private static OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
+        private static MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
+
+        public NodeLoadReportTask(String nodeId, IFeedManager feedManager) {
+            this.feedManager = feedManager;
+            this.message = new NodeReportMessage(0.0f, 0L, 0);
+            this.messageService = feedManager.getFeedMessageService();
+        }
+
+        @Override
+        public void run() {
+            List<FeedRuntimeId> runtimeIds = feedManager.getFeedConnectionManager().getRegisteredRuntimes();
+            int nRuntimes = runtimeIds.size();
+            double cpuLoad = getCpuLoad();
+            double usedHeap = getUsedHeap();
+            if (sendMessage(nRuntimes, cpuLoad, usedHeap)) {
+                message.reset(cpuLoad, usedHeap, nRuntimes);
+                messageService.sendMessage(message);
+            }
+        }
+
+        private boolean sendMessage(int nRuntimes, double cpuLoad, double usedHeap) {
+            if (message == null) {
+                return true;
+            }
+
+            boolean changeInCpu = (Math.abs(cpuLoad - message.getCpuLoad())
+                    / message.getCpuLoad()) > CPU_CHANGE_THRESHOLD;
+            boolean changeInUsedHeap = (Math.abs(usedHeap - message.getUsedHeap())
+                    / message.getUsedHeap()) > HEAP_CHANGE_THRESHOLD;
+            boolean changeInRuntimeSize = nRuntimes != message.getnRuntimes();
+            return changeInCpu || changeInUsedHeap || changeInRuntimeSize;
+        }
+
+        private double getCpuLoad() {
+            return osBean.getSystemLoadAverage();
+        }
+
+        private double getUsedHeap() {
+            return ((double) memBean.getHeapMemoryUsage().getUsed()) / memBean.getHeapMemoryUsage().getMax();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
new file mode 100644
index 0000000..ec95371
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
+
+public abstract class Series {
+
+    protected final MetricType type;
+    protected int runningSum;
+
+    public Series(MetricType type) {
+        this.type = type;
+    }
+
+    public abstract void addValue(int value);
+
+    public int getRunningSum() {
+        return runningSum;
+    }
+
+    public MetricType getType() {
+        return type;
+    }
+
+    public abstract void reset();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
new file mode 100644
index 0000000..f75379d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
+
+public class SeriesAvg extends Series {
+
+    private int count;
+
+    public SeriesAvg() {
+        super(MetricType.AVG);
+    }
+
+    public int getAvg() {
+        return runningSum / count;
+    }
+
+    public synchronized void addValue(int value) {
+        if (value < 0) {
+            return;
+        }
+        runningSum += value;
+        count++;
+    }
+    
+    public  void reset(){
+        count = 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
new file mode 100644
index 0000000..91eea87
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
+
+public class SeriesRate extends Series {
+
+    private static final long REFRESH_MEASUREMENT = 5000; // 5 seconds
+
+    private int rate;
+    private Timer timer;
+    private RateComputingTask task;
+
+    public SeriesRate() {
+        super(MetricType.RATE);
+        begin();
+    }
+
+    public int getRate() {
+        return rate;
+    }
+
+    public synchronized void addValue(int value) {
+        if (value < 0) {
+            return;
+        }
+        runningSum += value;
+    }
+
+    public void begin() {
+        if (timer == null) {
+            timer = new Timer();
+            task = new RateComputingTask(this);
+            timer.scheduleAtFixedRate(task, 0, REFRESH_MEASUREMENT);
+        }
+    }
+
+    public void end() {
+        if (timer != null) {
+            timer.cancel();
+        }
+    }
+
+    public void reset() {
+        rate = 0;
+        if (task != null) {
+            task.reset();
+        }
+    }
+
+    private class RateComputingTask extends TimerTask {
+
+        private int lastMeasured = 0;
+        private final SeriesRate series;
+
+        public RateComputingTask(SeriesRate series) {
+            this.series = series;
+        }
+
+        @Override
+        public void run() {
+            int currentValue = series.getRunningSum();
+            rate = (int) (((currentValue - lastMeasured) * 1000) / REFRESH_MEASUREMENT);
+            lastMeasured = currentValue;
+        }
+
+        public void reset() {
+            lastMeasured = 0;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
new file mode 100644
index 0000000..1f9551d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.watch;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedMetricCollector;
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFramePostProcessor;
+import org.apache.asterix.external.feed.api.IFramePreprocessor;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class StorageSideMonitoredBuffer extends MonitoredBuffer {
+
+    private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000; // 10
+                                                                      // seconds
+
+    private boolean ackingEnabled;
+    private final boolean timeTrackingEnabled;
+
+    public StorageSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
+            IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+            IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
+            FeedPolicyAccessor policyAccessor) {
+        super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+                exceptionHandler, callback, nPartitions, policyAccessor);
+        timeTrackingEnabled = policyAccessor.isTimeTrackingEnabled();
+        ackingEnabled = policyAccessor.atleastOnceSemantics();
+        if (ackingEnabled || timeTrackingEnabled) {
+            storageFromeHandler = new StorageFrameHandler();
+            this.storageTimeTrackingRateTask = new MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask(this,
+                    inputHandler.getFeedManager(), connectionId, runtimeId.getPartition(), policyAccessor,
+                    storageFromeHandler);
+            this.timer.scheduleAtFixedRate(storageTimeTrackingRateTask, 0, STORAGE_TIME_TRACKING_FREQUENCY);
+        }
+    }
+
+    @Override
+    protected boolean monitorProcessingRate() {
+        return false;
+    }
+
+    @Override
+    protected boolean logInflowOutflowRate() {
+        return true;
+    }
+
+    @Override
+    public IFramePreprocessor getFramePreProcessor() {
+        return new IFramePreprocessor() {
+
+            @Override
+            public void preProcess(ByteBuffer frame) {
+                try {
+                    if (ackingEnabled) {
+                        storageFromeHandler.updateTrackingInformation(frame, inflowFta);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+    }
+
+    @Override
+    protected IFramePostProcessor getFramePostProcessor() {
+        return new IFramePostProcessor() {
+
+            private static final long NORMAL_WINDOW_LIMIT = 400 * 1000;
+            private static final long HIGH_WINDOW_LIMIT = 800 * 1000;
+
+            private long delayNormalWindow = 0;
+            private long delayHighWindow = 0;
+            private long delayLowWindow = 0;
+
+            private int countNormalWindow;
+            private int countHighWindow;
+            private int countLowWindow;
+
+            private long beginIntakeTimestamp = 0;
+
+            @Override
+            public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
+                if (ackingEnabled || timeTrackingEnabled) {
+                    int nTuples = frameAccessor.getTupleCount();
+                    long intakeTimestamp;
+                    long currentTime = System.currentTimeMillis();
+                    for (int i = 0; i < nTuples; i++) {
+                        int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
+                        int openPartOffsetOrig = frame.getInt(recordStart + 6);
+                        int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
+
+                        int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
+                                + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
+
+                        int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2)
+                                + 1;
+
+                        int intakeTimestampValueOffset = partitionOffset + 4
+                                + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2) + 1;
+                        intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
+                        if (beginIntakeTimestamp == 0) {
+                            beginIntakeTimestamp = intakeTimestamp;
+                            LOGGER.warning("Begin Timestamp: " + beginIntakeTimestamp);
+                        }
+
+                        updateRunningAvg(intakeTimestamp, currentTime);
+
+                        int storeTimestampValueOffset = intakeTimestampValueOffset + 8
+                                + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
+                        frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
+                    }
+                    logRunningAvg();
+                    resetRunningAvg();
+                }
+            }
+
+            private void updateRunningAvg(long intakeTimestamp, long currentTime) {
+                long diffTimestamp = intakeTimestamp - beginIntakeTimestamp;
+                long delay = (currentTime - intakeTimestamp);
+                if (diffTimestamp < NORMAL_WINDOW_LIMIT) {
+                    delayNormalWindow += delay;
+                    countNormalWindow++;
+                } else if (diffTimestamp < HIGH_WINDOW_LIMIT) {
+                    delayHighWindow += delay;
+                    countHighWindow++;
+                } else {
+                    delayLowWindow += delay;
+                    countLowWindow++;
+                }
+            }
+
+            private void resetRunningAvg() {
+                delayNormalWindow = 0;
+                countNormalWindow = 0;
+                delayHighWindow = 0;
+                countHighWindow = 0;
+                delayLowWindow = 0;
+                countLowWindow = 0;
+            }
+
+            private void logRunningAvg() {
+                if (countNormalWindow != 0 && delayNormalWindow != 0) {
+                    LOGGER.warning("Window:" + 0 + ":" + "Avg Travel_Time:" + (delayNormalWindow / countNormalWindow));
+                }
+                if (countHighWindow != 0 && delayHighWindow != 0) {
+                    LOGGER.warning("Window:" + 1 + ":" + "Avg Travel_Time:" + (delayHighWindow / countHighWindow));
+                }
+                if (countLowWindow != 0 && delayLowWindow != 0) {
+                    LOGGER.warning("Window:" + 2 + ":" + "Avg Travel_Time:" + (delayLowWindow / countLowWindow));
+                }
+            }
+
+        };
+    }
+
+    public boolean isAckingEnabled() {
+        return ackingEnabled;
+    }
+
+    public void setAcking(boolean ackingEnabled) {
+        this.ackingEnabled = ackingEnabled;
+    }
+
+    public boolean isTimeTrackingEnabled() {
+        return timeTrackingEnabled;
+    }
+
+    @Override
+    protected boolean monitorInputQueueLength() {
+        return true;
+    }
+
+    @Override
+    protected boolean reportOutflowRate() {
+        return true;
+    }
+
+    @Override
+    protected boolean reportInflowRate() {
+        return false;
+    }
+
+}