You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/05/15 19:03:56 UTC

[3/9] incubator-asterixdb git commit: Cleanup Feed CodeBase

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
deleted file mode 100644
index 67e6295..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.logging.Level;
-
-import org.apache.asterix.external.feed.api.IExceptionHandler;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.ValueType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.api.IFrameEventCallback;
-import org.apache.asterix.external.feed.api.IFrameEventCallback.FrameEvent;
-import org.apache.asterix.external.feed.api.IFramePostProcessor;
-import org.apache.asterix.external.feed.api.IFramePreprocessor;
-import org.apache.asterix.external.feed.dataflow.DataBucket;
-import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
-import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.MessageReceiver;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.LogInputOutputRateTask;
-import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitorInputQueueLengthTimerTask;
-import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitoreProcessRateTimerTask;
-import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
-import org.apache.asterix.external.util.FeedFrameUtil;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public abstract class MonitoredBuffer extends MessageReceiver<DataBucket> {
-
-    protected static final long LOG_INPUT_OUTPUT_RATE_FREQUENCY = 5000; // 5 seconds
-    protected static final long INPUT_QUEUE_MEASURE_FREQUENCY = 1000; // 1 second
-    protected static final long PROCESSING_RATE_MEASURE_FREQUENCY = 10000; // 10 seconds
-
-    protected static final int PROCESS_RATE_REFRESH = 2; // refresh processing rate every 10th frame
-
-    protected final IHyracksTaskContext ctx;
-    protected final FeedConnectionId connectionId;
-    protected final FeedRuntimeId runtimeId;
-    protected final FrameTupleAccessor inflowFta;
-    protected final FrameTupleAccessor outflowFta;
-    protected final FeedRuntimeInputHandler inputHandler;
-    protected final IFrameEventCallback callback;
-    protected final Timer timer;
-    private final IExceptionHandler exceptionHandler;
-    protected final FeedPolicyAccessor policyAccessor;
-    protected int nPartitions;
-
-    private IFrameWriter frameWriter;
-    protected IFeedMetricCollector metricCollector;
-    protected boolean monitorProcessingRate = false;
-    protected boolean monitorInputQueueLength = false;
-    protected boolean logInflowOutflowRate = false;
-    protected boolean reportOutflowRate = false;
-    protected boolean reportInflowRate = false;
-
-    protected int inflowReportSenderId = -1;
-    protected int outflowReportSenderId = -1;
-    protected TimerTask monitorInputQueueLengthTask;
-    protected TimerTask processingRateTask;
-    protected TimerTask logInflowOutflowRateTask;
-    protected MonitoredBufferStorageTimerTask storageTimeTrackingRateTask;
-    protected StorageFrameHandler storageFromeHandler;
-
-    protected int processingRate = -1;
-    protected int frameCount = 0;
-    private long avgDelayPersistence = 0;
-    private boolean active;
-    private Map<Integer, Long> tupleTimeStats;
-    IFramePostProcessor postProcessor = null;
-    IFramePreprocessor preProcessor = null;
-
-    public static MonitoredBuffer getMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
-            IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
-            IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
-            FeedPolicyAccessor policyAccessor) {
-        switch (runtimeId.getFeedRuntimeType()) {
-            case COMPUTE:
-                return new ComputeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
-                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
-            case STORE:
-                return new StorageSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
-                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
-            case COLLECT:
-                return new IntakeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
-                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
-            default:
-                return new BasicMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
-                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
-        }
-    }
-
-    protected MonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
-            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
-            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
-            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
-        this.ctx = ctx;
-        this.connectionId = connectionId;
-        this.frameWriter = frameWriter;
-        this.inflowFta = new FrameTupleAccessor(recordDesc);
-        this.outflowFta = new FrameTupleAccessor(recordDesc);
-        this.runtimeId = runtimeId;
-        this.metricCollector = metricCollector;
-        this.exceptionHandler = exceptionHandler;
-        this.callback = callback;
-        this.inputHandler = inputHandler;
-        this.timer = new Timer();
-        this.policyAccessor = policyAccessor;
-        this.nPartitions = nPartitions;
-        this.active = true;
-        initializeMonitoring();
-    }
-
-    protected abstract boolean monitorProcessingRate();
-
-    protected abstract boolean logInflowOutflowRate();
-
-    protected abstract boolean reportOutflowRate();
-
-    protected abstract boolean reportInflowRate();
-
-    protected abstract boolean monitorInputQueueLength();
-
-    protected abstract IFramePreprocessor getFramePreProcessor();
-
-    protected abstract IFramePostProcessor getFramePostProcessor();
-
-    protected void initializeMonitoring() {
-        monitorProcessingRate = monitorProcessingRate();
-        monitorInputQueueLength = monitorInputQueueLength();
-        reportInflowRate = reportInflowRate();
-        reportOutflowRate = reportOutflowRate();
-        logInflowOutflowRate = policyAccessor.isLoggingStatisticsEnabled() || logInflowOutflowRate();
-
-        if (monitorProcessingRate && policyAccessor.isElastic()) { // check possibility to scale in
-            this.processingRateTask = new MonitoreProcessRateTimerTask(this, inputHandler.getFeedManager(),
-                    connectionId, nPartitions);
-            this.timer.scheduleAtFixedRate(processingRateTask, 0, PROCESSING_RATE_MEASURE_FREQUENCY);
-        }
-
-        if (monitorInputQueueLength && (policyAccessor.isElastic() || policyAccessor.throttlingEnabled()
-                || policyAccessor.spillToDiskOnCongestion() || policyAccessor.discardOnCongestion())) {
-            this.monitorInputQueueLengthTask = new MonitorInputQueueLengthTimerTask(this, callback);
-            this.timer.scheduleAtFixedRate(monitorInputQueueLengthTask, 0, INPUT_QUEUE_MEASURE_FREQUENCY);
-        }
-
-        if (reportInflowRate || reportOutflowRate) {
-            this.logInflowOutflowRateTask = new LogInputOutputRateTask(this, logInflowOutflowRate, reportInflowRate,
-                    reportOutflowRate);
-            this.timer.scheduleAtFixedRate(logInflowOutflowRateTask, 0, LOG_INPUT_OUTPUT_RATE_FREQUENCY);
-            this.inflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
-                    ValueType.INFLOW_RATE, MetricType.RATE);
-            this.outflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
-                    ValueType.OUTFLOW_RATE, MetricType.RATE);
-        }
-    }
-
-    protected void deinitializeMonitoring() {
-        if (monitorInputQueueLengthTask != null) {
-            monitorInputQueueLengthTask.cancel();
-        }
-        if (processingRateTask != null) {
-            processingRateTask.cancel();
-        }
-        if (reportInflowRate || reportOutflowRate) {
-            metricCollector.removeReportSender(inflowReportSenderId);
-            metricCollector.removeReportSender(outflowReportSenderId);
-            logInflowOutflowRateTask.cancel();
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Disabled monitoring for " + this.runtimeId);
-        }
-    }
-
-    protected void postProcessFrame(long startTime, ByteBuffer frame) throws Exception {
-        if (monitorProcessingRate) {
-            frameCount++;
-            if (frameCount % PROCESS_RATE_REFRESH == 0) {
-                long endTime = System.currentTimeMillis();
-                processingRate = (int) ((double) outflowFta.getTupleCount() * 1000 / (endTime - startTime));
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Processing Rate :" + processingRate + " tuples/sec");
-                }
-                frameCount = 0;
-            }
-        }
-
-        if (logInflowOutflowRate || reportOutflowRate) {
-            metricCollector.sendReport(outflowReportSenderId, outflowFta.getTupleCount());
-        }
-
-        postProcessFrame(frame);
-
-    }
-
-    protected void preProcessFrame(ByteBuffer frame) throws Exception {
-        if (preProcessor == null) {
-            preProcessor = getFramePreProcessor();
-        }
-        if (preProcessor != null) {
-            preProcessor.preProcess(frame);
-        }
-    }
-
-    protected void postProcessFrame(ByteBuffer frame) throws Exception {
-        if (postProcessor == null) {
-            postProcessor = getFramePostProcessor();
-        }
-        if (postProcessor != null) {
-            outflowFta.reset(frame);
-            postProcessor.postProcessFrame(frame, outflowFta);
-        }
-    }
-
-    @Override
-    public void sendMessage(DataBucket message) {
-        inbox.add(message);
-    }
-
-    public void sendReport(ByteBuffer frame) {
-        if ((reportInflowRate) && !(inputHandler.getMode().equals(Mode.PROCESS_BACKLOG)
-                || inputHandler.getMode().equals(Mode.PROCESS_SPILL))) {
-            inflowFta.reset(frame);
-            metricCollector.sendReport(inflowReportSenderId, inflowFta.getTupleCount());
-        }
-    }
-
-    /** return rate in terms of tuples/sec **/
-    public int getInflowRate() {
-        return metricCollector.getMetric(inflowReportSenderId);
-    }
-
-    /** return rate in terms of tuples/sec **/
-    public int getOutflowRate() {
-        return metricCollector.getMetric(outflowReportSenderId);
-    }
-
-    /** return the number of pending frames from the input queue **/
-    public int getWorkSize() {
-        return inbox.size();
-    }
-
-    /** reset the number of partitions (cardinality) for the runtime **/
-    public void setNumberOfPartitions(int nPartitions) {
-        if (processingRateTask != null) {
-            int currentPartitions = ((MonitoreProcessRateTimerTask) processingRateTask).getNumberOfPartitions();
-            if (currentPartitions != nPartitions) {
-                ((MonitoreProcessRateTimerTask) processingRateTask).setNumberOfPartitions(nPartitions);
-            }
-        }
-    }
-
-    public FeedRuntimeInputHandler getInputHandler() {
-        return inputHandler;
-    }
-
-    public synchronized void close(boolean processPending, boolean disableMonitoring) {
-        super.close(processPending);
-        if (disableMonitoring) {
-            deinitializeMonitoring();
-        }
-        active = false;
-    }
-
-    @Override
-    public synchronized void processMessage(DataBucket message) throws Exception {
-        if (!active) {
-            message.doneReading();
-            return;
-        }
-        switch (message.getContentType()) {
-            case DATA:
-                boolean finishedProcessing = false;
-                ByteBuffer frameReceived = message.getContent();
-                ByteBuffer frameToProcess = null;
-                if (inputHandler.isThrottlingEnabled()) {
-                    inflowFta.reset(frameReceived);
-                    int pRate = getProcessingRate();
-                    int inflowRate = getInflowRate();
-                    if (inflowRate > pRate) {
-                        double retainFraction = (pRate * 0.8 / inflowRate);
-                        frameToProcess = throttleFrame(inflowFta, retainFraction);
-                        inflowFta.reset(frameToProcess);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Throttling at fraction " + retainFraction + "inflow rate " + inflowRate
-                                    + " no of tuples remaining " + inflowFta.getTupleCount());
-
-                        }
-                    } else {
-                        frameToProcess = frameReceived;
-                    }
-                } else {
-                    frameToProcess = frameReceived;
-                }
-                outflowFta.reset(frameToProcess);
-                long startTime = 0;
-                while (!finishedProcessing) {
-                    try {
-                        inflowFta.reset(frameToProcess);
-                        startTime = System.currentTimeMillis();
-                        preProcessFrame(frameToProcess);
-                        frameWriter.nextFrame(frameToProcess);
-                        postProcessFrame(startTime, frameToProcess);
-                        finishedProcessing = true;
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        frameToProcess = exceptionHandler.handleException(e, frameToProcess);
-                        finishedProcessing = true;
-                    }
-                }
-                message.doneReading();
-                break;
-            case EOD:
-                message.doneReading();
-                timer.cancel();
-                callback.frameEvent(FrameEvent.FINISHED_PROCESSING);
-                break;
-            case EOSD:
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Done processing spillage");
-                }
-                message.doneReading();
-                callback.frameEvent(FrameEvent.FINISHED_PROCESSING_SPILLAGE);
-                break;
-
-        }
-    }
-
-    private ByteBuffer throttleFrame(FrameTupleAccessor fta, double retainFraction) throws HyracksDataException {
-        int desiredTuples = (int) (fta.getTupleCount() * retainFraction);
-        return FeedFrameUtil.getSampledFrame(ctx, fta, desiredTuples);
-    }
-
-    public Mode getMode() {
-        return inputHandler.getMode();
-    }
-
-    public FeedRuntimeId getRuntimeId() {
-        return runtimeId;
-    }
-
-    public void setFrameWriter(IFrameWriter frameWriter) {
-        this.frameWriter = frameWriter;
-    }
-
-    public void reset() {
-        active = true;
-        if (logInflowOutflowRate) {
-            metricCollector.resetReportSender(inflowReportSenderId);
-            metricCollector.resetReportSender(outflowReportSenderId);
-        }
-    }
-
-    public int getProcessingRate() {
-        return processingRate;
-    }
-
-    public Map<Integer, Long> getTupleTimeStats() {
-        return tupleTimeStats;
-    }
-
-    public long getAvgDelayRecordPersistence() {
-        return avgDelayPersistence;
-    }
-
-    public MonitoredBufferStorageTimerTask getStorageTimeTrackingRateTask() {
-        return storageTimeTrackingRateTask;
-    }
-
-    @Override
-    public void emptyInbox() throws HyracksDataException {
-        inputHandler.flush();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
deleted file mode 100644
index 86c6bca..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedMessageService;
-import org.apache.asterix.external.feed.api.IFrameEventCallback;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.ValueType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.api.IFrameEventCallback.FrameEvent;
-import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedReportMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
-import org.apache.asterix.external.feed.message.ScaleInReportMessage;
-import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-
-public class MonitoredBufferTimerTasks {
-
-    private static final Logger LOGGER = Logger.getLogger(MonitorInputQueueLengthTimerTask.class.getName());
-
-    public static class MonitoredBufferStorageTimerTask extends TimerTask {
-
-        private static final int PERSISTENCE_DELAY_VIOLATION_MAX = 5;
-
-        private final StorageSideMonitoredBuffer mBuffer;
-        private final IFeedManager feedManager;
-        private final int partition;
-        private final FeedConnectionId connectionId;
-        private final FeedPolicyAccessor policyAccessor;
-        private final StorageFrameHandler storageFromeHandler;
-        private final StorageReportFeedMessage storageReportMessage;
-        private final FeedTupleCommitAckMessage tupleCommitAckMessage;
-
-        private Map<Integer, Integer> maxIntakeBaseCovered;
-        private int countDelayExceeded = 0;
-
-        public MonitoredBufferStorageTimerTask(StorageSideMonitoredBuffer mBuffer, IFeedManager feedManager,
-                FeedConnectionId connectionId, int partition, FeedPolicyAccessor policyAccessor,
-                StorageFrameHandler storageFromeHandler) {
-            this.mBuffer = mBuffer;
-            this.feedManager = feedManager;
-            this.connectionId = connectionId;
-            this.partition = partition;
-            this.policyAccessor = policyAccessor;
-            this.storageFromeHandler = storageFromeHandler;
-            this.storageReportMessage = new StorageReportFeedMessage(this.connectionId, this.partition, 0, false, 0, 0);
-            this.tupleCommitAckMessage = new FeedTupleCommitAckMessage(this.connectionId, 0, 0, null);
-            this.maxIntakeBaseCovered = new HashMap<Integer, Integer>();
-        }
-
-        @Override
-        public void run() {
-            if (mBuffer.isAckingEnabled() && !mBuffer.getInputHandler().isThrottlingEnabled()) {
-                ackRecords();
-            }
-            if (mBuffer.isTimeTrackingEnabled()) {
-                checkLatencyViolation();
-            }
-        }
-
-        private void ackRecords() {
-            Set<Integer> partitions = storageFromeHandler.getPartitionsWithStats();
-            List<Integer> basesCovered = new ArrayList<Integer>();
-            for (int intakePartition : partitions) {
-                Map<Integer, IntakePartitionStatistics> baseAcks = storageFromeHandler
-                        .getBaseAcksForPartition(intakePartition);
-                for (Entry<Integer, IntakePartitionStatistics> entry : baseAcks.entrySet()) {
-                    int base = entry.getKey();
-                    IntakePartitionStatistics stats = entry.getValue();
-                    Integer maxIntakeBaseForPartition = maxIntakeBaseCovered.get(intakePartition);
-                    if (maxIntakeBaseForPartition == null || maxIntakeBaseForPartition < base) {
-                        tupleCommitAckMessage.reset(intakePartition, base, stats.getAckInfo());
-                        feedManager.getFeedMessageService().sendMessage(tupleCommitAckMessage);
-                    } else {
-                        basesCovered.add(base);
-                    }
-                }
-                for (Integer b : basesCovered) {
-                    baseAcks.remove(b);
-                }
-                basesCovered.clear();
-            }
-        }
-
-        private void checkLatencyViolation() {
-            long avgDelayPersistence = storageFromeHandler.getAvgDelayPersistence();
-            if (avgDelayPersistence > policyAccessor.getMaxDelayRecordPersistence()) {
-                countDelayExceeded++;
-                if (countDelayExceeded > PERSISTENCE_DELAY_VIOLATION_MAX) {
-                    storageReportMessage.reset(0, false, mBuffer.getAvgDelayRecordPersistence());
-                    feedManager.getFeedMessageService().sendMessage(storageReportMessage);
-                }
-            } else {
-                countDelayExceeded = 0;
-            }
-        }
-
-        public void receiveCommitAckResponse(FeedTupleCommitResponseMessage message) {
-            maxIntakeBaseCovered.put(message.getIntakePartition(), message.getMaxWindowAcked());
-        }
-    }
-
-    public static class LogInputOutputRateTask extends TimerTask {
-
-        private final MonitoredBuffer mBuffer;
-        private final boolean log;
-        private final boolean reportInflow;
-        private final boolean reportOutflow;
-
-        private final IFeedMessageService messageService;
-        private final FeedReportMessage message;
-
-        public LogInputOutputRateTask(MonitoredBuffer mBuffer, boolean log, boolean reportInflow, boolean reportOutflow) {
-            this.mBuffer = mBuffer;
-            this.log = log;
-            this.reportInflow = reportInflow;
-            this.reportOutflow = reportOutflow;
-            if (reportInflow || reportOutflow) {
-                ValueType vType = reportInflow ? ValueType.INFLOW_RATE : ValueType.OUTFLOW_RATE;
-                messageService = mBuffer.getInputHandler().getFeedManager().getFeedMessageService();
-                message = new FeedReportMessage(mBuffer.getInputHandler().getConnectionId(), mBuffer.getRuntimeId(),
-                        vType, 0);
-            } else {
-                messageService = null;
-                message = null;
-            }
-
-        }
-
-        @Override
-        public void run() {
-            int pendingWork = mBuffer.getWorkSize();
-            int outflowRate = mBuffer.getOutflowRate();
-            int inflowRate = mBuffer.getInflowRate();
-            if (log) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info(mBuffer.getRuntimeId() + " " + "Inflow rate:" + inflowRate + " Outflow Rate:"
-                            + outflowRate + " Pending Work " + pendingWork);
-                }
-            }
-            if (reportInflow) {
-                message.reset(inflowRate);
-            } else if (reportOutflow) {
-                message.reset(outflowRate);
-            }
-            messageService.sendMessage(message);
-        }
-    }
-
-    public static class MonitorInputQueueLengthTimerTask extends TimerTask {
-
-        private final MonitoredBuffer mBuffer;
-        private final IFrameEventCallback callback;
-        private final int pendingWorkThreshold;
-        private final int maxSuccessiveThresholdPeriods;
-        private FrameEvent lastEvent = FrameEvent.NO_OP;
-        private int pendingWorkExceedCount = 0;
-
-        public MonitorInputQueueLengthTimerTask(MonitoredBuffer mBuffer, IFrameEventCallback callback) {
-            this.mBuffer = mBuffer;
-            this.callback = callback;
-            AsterixFeedProperties props = mBuffer.getInputHandler().getFeedManager().getAsterixFeedProperties();
-            pendingWorkThreshold = props.getPendingWorkThreshold();
-            maxSuccessiveThresholdPeriods = props.getMaxSuccessiveThresholdPeriod();
-        }
-
-        @Override
-        public void run() {
-            int pendingWork = mBuffer.getWorkSize();
-            if (mBuffer.getMode().equals(Mode.PROCESS_SPILL) || mBuffer.getMode().equals(Mode.PROCESS_BACKLOG)) {
-                return;
-            }
-
-            switch (lastEvent) {
-                case NO_OP:
-                case PENDING_WORK_DONE:
-                case FINISHED_PROCESSING_SPILLAGE:
-                    if (pendingWork > pendingWorkThreshold) {
-                        pendingWorkExceedCount++;
-                        if (pendingWorkExceedCount > maxSuccessiveThresholdPeriods) {
-                            pendingWorkExceedCount = 0;
-                            lastEvent = FrameEvent.PENDING_WORK_THRESHOLD_REACHED;
-                            callback.frameEvent(lastEvent);
-                        }
-                    } else if (pendingWork == 0 && mBuffer.getMode().equals(Mode.SPILL)) {
-                        lastEvent = FrameEvent.PENDING_WORK_DONE;
-                        callback.frameEvent(lastEvent);
-                    }
-                    break;
-                case PENDING_WORK_THRESHOLD_REACHED:
-                    if (((pendingWork * 1.0) / pendingWorkThreshold) <= 0.5) {
-                        lastEvent = FrameEvent.PENDING_WORK_DONE;
-                        callback.frameEvent(lastEvent);
-                    }
-                    break;
-                case FINISHED_PROCESSING:
-                    break;
-
-            }
-        }
-    }
-
-    /**
-     * A timer task to measure and compare the processing rate and inflow rate
-     * to look for possibility to scale-in, that is reduce the degree of cardinality
-     * of the compute operator.
-     */
-    public static class MonitoreProcessRateTimerTask extends TimerTask {
-
-        private final MonitoredBuffer mBuffer;
-        private final IFeedManager feedManager;
-        private int nPartitions;
-        private ScaleInReportMessage sMessage;
-        private boolean proposedChange;
-
-        public MonitoreProcessRateTimerTask(MonitoredBuffer mBuffer, IFeedManager feedManager,
-                FeedConnectionId connectionId, int nPartitions) {
-            this.mBuffer = mBuffer;
-            this.feedManager = feedManager;
-            this.nPartitions = nPartitions;
-            this.sMessage = new ScaleInReportMessage(connectionId, FeedRuntimeType.COMPUTE, 0, 0);
-            this.proposedChange = false;
-        }
-
-        public int getNumberOfPartitions() {
-            return nPartitions;
-        }
-
-        public void setNumberOfPartitions(int nPartitions) {
-            this.nPartitions = nPartitions;
-            proposedChange = false;
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Reset the number of partitions for " + mBuffer.getRuntimeId() + " to " + nPartitions);
-            }
-        }
-
-        @Override
-        public void run() {
-            if (!proposedChange) {
-                int inflowRate = mBuffer.getInflowRate();
-                int procRate = mBuffer.getProcessingRate();
-                if (inflowRate > 0 && procRate > 0) {
-                    if (inflowRate < procRate) {
-                        int possibleCardinality = (int) Math.ceil(nPartitions * inflowRate / (double) procRate);
-                        if (possibleCardinality < nPartitions
-                                && ((((nPartitions - possibleCardinality) * 1.0) / nPartitions) >= 0.25)) {
-                            sMessage.reset(nPartitions, possibleCardinality);
-                            feedManager.getFeedMessageService().sendMessage(sMessage);
-                            proposedChange = true;
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Proposed scale-in " + sMessage);
-                            }
-                        }
-                    } else {
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Inflow Rate (" + inflowRate + ") exceeds Processing Rate" + " (" + procRate
-                                    + ")");
-                        }
-                    }
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Waiting for earlier proposal to scale in to be applied");
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
deleted file mode 100644
index d3919b5..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-
-public class NodeLoad implements Comparable<NodeLoad> {
-
-    private final String nodeId;
-
-    private int nRuntimes;
-
-    public NodeLoad(String nodeId) {
-        this.nodeId = nodeId;
-        this.nRuntimes = 0;
-    }
-
-    public void addLoad() {
-        nRuntimes++;
-    }
-
-    public void removeLoad(FeedRuntimeType runtimeType) {
-        nRuntimes--;
-    }
-
-    @Override
-    public int compareTo(NodeLoad o) {
-        if (this == o) {
-            return 0;
-        }
-        return nRuntimes - o.getnRuntimes();
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public int getnRuntimes() {
-        return nRuntimes;
-    }
-
-    public void setnRuntimes(int nRuntimes) {
-        this.nRuntimes = nRuntimes;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
deleted file mode 100644
index bfddcf6..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import org.apache.asterix.external.util.FeedConstants;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class NodeLoadReport implements Comparable<NodeLoadReport> {
-
-    private final String nodeId;
-    private float cpuLoad;
-    private double usedHeap;
-    private int nRuntimes;
-
-    public NodeLoadReport(String nodeId, float cpuLoad, float usedHeap, int nRuntimes) {
-        this.nodeId = nodeId;
-        this.cpuLoad = cpuLoad;
-        this.usedHeap = usedHeap;
-        this.nRuntimes = nRuntimes;
-    }
-
-    public static NodeLoadReport read(JSONObject obj) throws JSONException {
-        NodeLoadReport r = new NodeLoadReport(obj.getString(FeedConstants.MessageConstants.NODE_ID),
-                (float) obj.getDouble(FeedConstants.MessageConstants.CPU_LOAD),
-                (float) obj.getDouble(FeedConstants.MessageConstants.HEAP_USAGE),
-                obj.getInt(FeedConstants.MessageConstants.N_RUNTIMES));
-        return r;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof NodeLoadReport)) {
-            return false;
-        }
-        return ((NodeLoadReport) o).nodeId.equals(nodeId);
-    }
-
-    @Override
-    public int hashCode() {
-        return nodeId.hashCode();
-    }
-
-    @Override
-    public int compareTo(NodeLoadReport o) {
-        if (nRuntimes != o.getnRuntimes()) {
-            return nRuntimes - o.getnRuntimes();
-        } else {
-            return (int) (this.cpuLoad - ((NodeLoadReport) o).cpuLoad);
-        }
-    }
-
-    public float getCpuLoad() {
-        return cpuLoad;
-    }
-
-    public void setCpuLoad(float cpuLoad) {
-        this.cpuLoad = cpuLoad;
-    }
-
-    public double getUsedHeap() {
-        return usedHeap;
-    }
-
-    public void setUsedHeap(double usedHeap) {
-        this.usedHeap = usedHeap;
-    }
-
-    public int getnRuntimes() {
-        return nRuntimes;
-    }
-
-    public void setnRuntimes(int nRuntimes) {
-        this.nRuntimes = nRuntimes;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
deleted file mode 100644
index f651935..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.OperatingSystemMXBean;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedMessageService;
-import org.apache.asterix.external.feed.api.IFeedService;
-import org.apache.asterix.external.feed.message.NodeReportMessage;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-
-public class NodeLoadReportService implements IFeedService {
-
-    private static final int NODE_LOAD_REPORT_FREQUENCY = 2000;
-    private static final float CPU_CHANGE_THRESHOLD = 0.2f;
-    private static final float HEAP_CHANGE_THRESHOLD = 0.4f;
-
-    private final NodeLoadReportTask task;
-    private final Timer timer;
-
-    public NodeLoadReportService(String nodeId, IFeedManager feedManager) {
-        this.task = new NodeLoadReportTask(nodeId, feedManager);
-        this.timer = new Timer();
-    }
-
-    @Override
-    public void start() throws Exception {
-        timer.schedule(task, 0, NODE_LOAD_REPORT_FREQUENCY);
-    }
-
-    @Override
-    public void stop() {
-        timer.cancel();
-    }
-
-    private static class NodeLoadReportTask extends TimerTask {
-
-        private final IFeedManager feedManager;
-        private final NodeReportMessage message;
-        private final IFeedMessageService messageService;
-
-        private static OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
-        private static MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
-
-        public NodeLoadReportTask(String nodeId, IFeedManager feedManager) {
-            this.feedManager = feedManager;
-            this.message = new NodeReportMessage(0.0f, 0L, 0);
-            this.messageService = feedManager.getFeedMessageService();
-        }
-
-        @Override
-        public void run() {
-            List<FeedRuntimeId> runtimeIds = feedManager.getFeedConnectionManager().getRegisteredRuntimes();
-            int nRuntimes = runtimeIds.size();
-            double cpuLoad = getCpuLoad();
-            double usedHeap = getUsedHeap();
-            if (sendMessage(nRuntimes, cpuLoad, usedHeap)) {
-                message.reset(cpuLoad, usedHeap, nRuntimes);
-                messageService.sendMessage(message);
-            }
-        }
-
-        private boolean sendMessage(int nRuntimes, double cpuLoad, double usedHeap) {
-            if (message == null) {
-                return true;
-            }
-
-            boolean changeInCpu = (Math.abs(cpuLoad - message.getCpuLoad())
-                    / message.getCpuLoad()) > CPU_CHANGE_THRESHOLD;
-            boolean changeInUsedHeap = (Math.abs(usedHeap - message.getUsedHeap())
-                    / message.getUsedHeap()) > HEAP_CHANGE_THRESHOLD;
-            boolean changeInRuntimeSize = nRuntimes != message.getnRuntimes();
-            return changeInCpu || changeInUsedHeap || changeInRuntimeSize;
-        }
-
-        private double getCpuLoad() {
-            return osBean.getSystemLoadAverage();
-        }
-
-        private double getUsedHeap() {
-            return ((double) memBean.getHeapMemoryUsage().getUsed()) / memBean.getHeapMemoryUsage().getMax();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
deleted file mode 100644
index ec95371..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
-
-public abstract class Series {
-
-    protected final MetricType type;
-    protected int runningSum;
-
-    public Series(MetricType type) {
-        this.type = type;
-    }
-
-    public abstract void addValue(int value);
-
-    public int getRunningSum() {
-        return runningSum;
-    }
-
-    public MetricType getType() {
-        return type;
-    }
-
-    public abstract void reset();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
deleted file mode 100644
index 6182753..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
-
-public class SeriesAvg extends Series {
-
-    private int count;
-
-    public SeriesAvg() {
-        super(MetricType.AVG);
-    }
-
-    public int getAvg() {
-        return runningSum / count;
-    }
-
-    public synchronized void addValue(int value) {
-        if (value < 0) {
-            return;
-        }
-        runningSum += value;
-        count++;
-    }
-
-    public  void reset(){
-        count = 0;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
deleted file mode 100644
index 91eea87..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
-
-public class SeriesRate extends Series {
-
-    private static final long REFRESH_MEASUREMENT = 5000; // 5 seconds
-
-    private int rate;
-    private Timer timer;
-    private RateComputingTask task;
-
-    public SeriesRate() {
-        super(MetricType.RATE);
-        begin();
-    }
-
-    public int getRate() {
-        return rate;
-    }
-
-    public synchronized void addValue(int value) {
-        if (value < 0) {
-            return;
-        }
-        runningSum += value;
-    }
-
-    public void begin() {
-        if (timer == null) {
-            timer = new Timer();
-            task = new RateComputingTask(this);
-            timer.scheduleAtFixedRate(task, 0, REFRESH_MEASUREMENT);
-        }
-    }
-
-    public void end() {
-        if (timer != null) {
-            timer.cancel();
-        }
-    }
-
-    public void reset() {
-        rate = 0;
-        if (task != null) {
-            task.reset();
-        }
-    }
-
-    private class RateComputingTask extends TimerTask {
-
-        private int lastMeasured = 0;
-        private final SeriesRate series;
-
-        public RateComputingTask(SeriesRate series) {
-            this.series = series;
-        }
-
-        @Override
-        public void run() {
-            int currentValue = series.getRunningSum();
-            rate = (int) (((currentValue - lastMeasured) * 1000) / REFRESH_MEASUREMENT);
-            lastMeasured = currentValue;
-        }
-
-        public void reset() {
-            lastMeasured = 0;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
deleted file mode 100644
index 9db930e..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.feed.api.IExceptionHandler;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector;
-import org.apache.asterix.external.feed.api.IFrameEventCallback;
-import org.apache.asterix.external.feed.api.IFramePostProcessor;
-import org.apache.asterix.external.feed.api.IFramePreprocessor;
-import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
-import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class StorageSideMonitoredBuffer extends MonitoredBuffer {
-
-    private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000;
-
-    private boolean ackingEnabled;
-    private final boolean timeTrackingEnabled;
-
-    public StorageSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
-            IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
-            IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
-            FeedPolicyAccessor policyAccessor) {
-        super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
-                exceptionHandler, callback, nPartitions, policyAccessor);
-        timeTrackingEnabled = policyAccessor.isTimeTrackingEnabled();
-        ackingEnabled = policyAccessor.atleastOnceSemantics();
-        if (ackingEnabled || timeTrackingEnabled) {
-            storageFromeHandler = new StorageFrameHandler();
-            this.storageTimeTrackingRateTask = new MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask(this,
-                    inputHandler.getFeedManager(), connectionId, runtimeId.getPartition(), policyAccessor,
-                    storageFromeHandler);
-            this.timer.scheduleAtFixedRate(storageTimeTrackingRateTask, 0, STORAGE_TIME_TRACKING_FREQUENCY);
-        }
-    }
-
-    @Override
-    protected boolean monitorProcessingRate() {
-        return false;
-    }
-
-    @Override
-    protected boolean logInflowOutflowRate() {
-        return true;
-    }
-
-    @Override
-    public IFramePreprocessor getFramePreProcessor() {
-        return new IFramePreprocessor() {
-
-            @Override
-            public void preProcess(ByteBuffer frame) {
-                try {
-                    if (ackingEnabled) {
-                        storageFromeHandler.updateTrackingInformation(frame, inflowFta);
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        };
-    }
-
-    @Override
-    protected IFramePostProcessor getFramePostProcessor() {
-        return new IFramePostProcessor() {
-
-            private static final long NORMAL_WINDOW_LIMIT = 400 * 1000;
-            private static final long HIGH_WINDOW_LIMIT = 800 * 1000;
-
-            private long delayNormalWindow = 0;
-            private long delayHighWindow = 0;
-            private long delayLowWindow = 0;
-
-            private int countNormalWindow;
-            private int countHighWindow;
-            private int countLowWindow;
-
-            private long beginIntakeTimestamp = 0;
-
-            @Override
-            public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
-                if (ackingEnabled || timeTrackingEnabled) {
-                    int nTuples = frameAccessor.getTupleCount();
-                    long intakeTimestamp;
-                    long currentTime = System.currentTimeMillis();
-                    for (int i = 0; i < nTuples; i++) {
-                        int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
-                        int openPartOffsetOrig = frame.getInt(recordStart + 6);
-                        int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
-
-                        int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
-                                + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
-
-                        int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2)
-                                + 1;
-
-                        int intakeTimestampValueOffset = partitionOffset + 4
-                                + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2) + 1;
-                        intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
-                        if (beginIntakeTimestamp == 0) {
-                            beginIntakeTimestamp = intakeTimestamp;
-                            LOGGER.warning("Begin Timestamp: " + beginIntakeTimestamp);
-                        }
-
-                        updateRunningAvg(intakeTimestamp, currentTime);
-
-                        int storeTimestampValueOffset = intakeTimestampValueOffset + 8
-                                + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
-                        frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
-                    }
-                    logRunningAvg();
-                    resetRunningAvg();
-                }
-            }
-
-            private void updateRunningAvg(long intakeTimestamp, long currentTime) {
-                long diffTimestamp = intakeTimestamp - beginIntakeTimestamp;
-                long delay = (currentTime - intakeTimestamp);
-                if (diffTimestamp < NORMAL_WINDOW_LIMIT) {
-                    delayNormalWindow += delay;
-                    countNormalWindow++;
-                } else if (diffTimestamp < HIGH_WINDOW_LIMIT) {
-                    delayHighWindow += delay;
-                    countHighWindow++;
-                } else {
-                    delayLowWindow += delay;
-                    countLowWindow++;
-                }
-            }
-
-            private void resetRunningAvg() {
-                delayNormalWindow = 0;
-                countNormalWindow = 0;
-                delayHighWindow = 0;
-                countHighWindow = 0;
-                delayLowWindow = 0;
-                countLowWindow = 0;
-            }
-
-            private void logRunningAvg() {
-                if (countNormalWindow != 0 && delayNormalWindow != 0) {
-                    LOGGER.warning("Window:" + 0 + ":" + "Avg Travel_Time:" + (delayNormalWindow / countNormalWindow));
-                }
-                if (countHighWindow != 0 && delayHighWindow != 0) {
-                    LOGGER.warning("Window:" + 1 + ":" + "Avg Travel_Time:" + (delayHighWindow / countHighWindow));
-                }
-                if (countLowWindow != 0 && delayLowWindow != 0) {
-                    LOGGER.warning("Window:" + 2 + ":" + "Avg Travel_Time:" + (delayLowWindow / countLowWindow));
-                }
-            }
-
-        };
-    }
-
-    public boolean isAckingEnabled() {
-        return ackingEnabled;
-    }
-
-    public void setAcking(boolean ackingEnabled) {
-        this.ackingEnabled = ackingEnabled;
-    }
-
-    public boolean isTimeTrackingEnabled() {
-        return timeTrackingEnabled;
-    }
-
-    @Override
-    protected boolean monitorInputQueueLength() {
-        return true;
-    }
-
-    @Override
-    protected boolean reportOutflowRate() {
-        return true;
-    }
-
-    @Override
-    protected boolean reportInflowRate() {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index ce1d893..26f8654 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -116,14 +116,14 @@ public class SocketServerInputStream extends AsterixInputStream {
             }
             socket = null;
         } catch (IOException e) {
-            hde = ExternalDataExceptionUtils.suppress(hde, e);
+            hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, e);
         }
         try {
             if (server != null) {
                 server.close();
             }
         } catch (IOException e) {
-            hde = ExternalDataExceptionUtils.suppress(hde, e);
+            hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, e);
         } finally {
             server = null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 9485b77..36098ee 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -19,17 +19,14 @@
 package org.apache.asterix.external.operators;
 
 import java.util.Map;
-import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedManager;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.runtime.IngestionRuntime;
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.external.feed.management.FeedManager;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -48,7 +45,6 @@ import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescri
 public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(FeedCollectOperatorDescriptor.class.getName());
 
     /** The type associated with the ADM data output from (the feed adapter OR the compute operator) */
     private final IAType outputType;
@@ -59,9 +55,6 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     /** Map representation of policy parameters */
     private final Map<String, String> feedPolicyProperties;
 
-    /** The (singleton) instance of {@code IFeedIngestionManager} **/
-    private IFeedSubscriptionManager subscriptionManager;
-
     /** The source feed from which the feed derives its data from. **/
     private final FeedId sourceFeedId;
 
@@ -72,7 +65,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
             ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
             FeedRuntimeType subscriptionLocation) {
         super(spec, 0, 1);
-        recordDescriptors[0] = rDesc;
+        this.recordDescriptors[0] = rDesc;
         this.outputType = atype;
         this.connectionId = feedConnectionId;
         this.feedPolicyProperties = feedPolicyProperties;
@@ -84,22 +77,11 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        this.subscriptionManager = ((IFeedManager) runtimeCtx.getFeedManager()).getFeedSubscriptionManager();
-        ISubscribableRuntime sourceRuntime = null;
-        SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
-                subscriptionLocation, partition);
-        switch (subscriptionLocation) {
-            case INTAKE:
-                sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
-                break;
-            case COMPUTE:
-                sourceRuntime = subscriptionManager.getSubscribableRuntime(feedSubscribableRuntimeId);
-                break;
-            default:
-                throw new HyracksDataException("Can't subscirbe to FeedRuntime with Type: " + subscriptionLocation);
-        }
+        FeedManager feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject()).getFeedManager();
+        FeedRuntimeId sourceRuntimeId =
+                new FeedRuntimeId(sourceFeedId, subscriptionLocation, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
+        ISubscribableRuntime sourceRuntime = feedManager.getSubscribableRuntime(sourceRuntimeId);
         return new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId, feedPolicyProperties, partition,
                 nPartitions, sourceRuntime);
     }
@@ -124,10 +106,6 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
         return sourceFeedId;
     }
 
-    private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
-        return (IngestionRuntime) subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
-    }
-
     public FeedRuntimeType getSubscriptionLocation() {
         return subscriptionLocation;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 178d2d5..aeea6ba 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -19,29 +19,21 @@
 package org.apache.asterix.external.operators;
 
 import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
 import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.dataflow.CollectTransformFeedFrameWriter;
-import org.apache.asterix.external.feed.dataflow.FeedCollectRuntimeInputHandler;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedManager;
 import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -50,19 +42,14 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNod
  * The first operator in a collect job in a feed.
  */
 public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-    private static Logger LOGGER = Logger.getLogger(FeedCollectOperatorNodePushable.class.getName());
 
     private final int partition;
     private final FeedConnectionId connectionId;
     private final Map<String, String> feedPolicy;
     private final FeedPolicyAccessor policyAccessor;
-    private final IFeedManager feedManager;
+    private final FeedManager feedManager;
     private final ISubscribableRuntime sourceRuntime;
     private final IHyracksTaskContext ctx;
-    private final int nPartitions;
-
-    private RecordDescriptor outputRecordDescriptor;
-    private FeedRuntimeInputHandler inputSideHandler;
     private CollectionRuntime collectRuntime;
 
     public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedId sourceFeedId,
@@ -70,139 +57,38 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
             ISubscribableRuntime sourceRuntime) {
         this.ctx = ctx;
         this.partition = partition;
-        this.nPartitions = nPartitions;
         this.connectionId = feedConnectionId;
         this.sourceRuntime = sourceRuntime;
         this.feedPolicy = feedPolicy;
         this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
-        this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+        this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
     }
 
     @Override
     public void initialize() throws HyracksDataException {
         try {
-            outputRecordDescriptor = recordDesc;
-            switch (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType()) {
-                case INTAKE:
-                    handleCompleteConnection();
-                    // Notify CC that Collection started
-                    ctx.sendApplicationMessageToCC(
-                            new FeedPartitionStartMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId()),
-                            null);
-                    break;
-                case COMPUTE:
-                    handlePartialConnection();
-                    break;
-                default:
-                    throw new IllegalStateException("Invalid source type "
-                            + ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType());
-            }
-            State state = collectRuntime.waitTillCollectionOver();
-            if (state.equals(State.FINISHED)) {
-                feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
-                        collectRuntime.getRuntimeId());
-                writer.close();
-                inputSideHandler.close();
-            } else if (state.equals(State.HANDOVER)) {
-                inputSideHandler.setMode(Mode.STALL);
-                writer.close();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Ending Collect Operator, the input side handler is now in " + Mode.STALL
-                            + " and the output writer " + writer + " has been closed ");
-                }
+            FeedRuntimeId runtimeId = new FeedRuntimeId(connectionId.getFeedId(), FeedRuntimeType.COLLECT, partition,
+                    FeedRuntimeId.DEFAULT_TARGET_ID);
+            // Does this collector have a handler?
+            FrameTupleAccessor tAccessor = new FrameTupleAccessor(recordDesc);
+            if (policyAccessor.bufferingEnabled()) {
+                writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, policyAccessor, tAccessor,
+                        feedManager.getFeedMemoryManager());
+            } else {
+                writer = new SyncFeedRuntimeInputHandler(ctx, writer, tAccessor);
             }
-        } catch (InterruptedException ie) {
-            handleInterruptedException(ie);
+            collectRuntime = new CollectionRuntime(connectionId, runtimeId, sourceRuntime, feedPolicy, ctx,
+                    new FeedFrameCollector(policyAccessor, writer, connectionId));
+            feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+            sourceRuntime.subscribe(collectRuntime);
+            // Notify CC that Collection started
+            ctx.sendApplicationMessageToCC(
+                    new FeedPartitionStartMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId()), null);
+            collectRuntime.waitTillCollectionOver();
+            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
         } catch (Exception e) {
-            e.printStackTrace();
             throw new HyracksDataException(e);
         }
     }
-
-    private void handleCompleteConnection() throws Exception {
-        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COLLECT, partition,
-                FeedRuntimeId.DEFAULT_OPERAND_ID);
-        collectRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(connectionId,
-                runtimeId);
-        if (collectRuntime == null) {
-            beginNewFeed(runtimeId);
-        } else {
-            reviveOldFeed();
-        }
-    }
-
-    private void beginNewFeed(FeedRuntimeId runtimeId) throws Exception {
-        writer.open();
-        IFrameWriter outputSideWriter = writer;
-        if (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType()
-                .equals(FeedRuntimeType.COMPUTE)) {
-            outputSideWriter = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime, outputRecordDescriptor,
-                    connectionId);
-            this.recordDesc = sourceRuntime.getRecordDescriptor();
-        }
-
-        FrameTupleAccessor tupleAccessor = new FrameTupleAccessor(recordDesc);
-        inputSideHandler = new FeedCollectRuntimeInputHandler(ctx, connectionId, runtimeId, outputSideWriter,
-                policyAccessor, false, tupleAccessor, recordDesc, feedManager, nPartitions);
-
-        collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, outputSideWriter,
-                sourceRuntime, feedPolicy, ctx);
-        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
-        sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
-    }
-
-    private void reviveOldFeed() throws HyracksDataException {
-        writer.open();
-        collectRuntime.getFrameCollector().setState(State.ACTIVE);
-        inputSideHandler = collectRuntime.getInputHandler();
-
-        IFrameWriter innerWriter = inputSideHandler.getCoreOperator();
-        if (innerWriter instanceof CollectTransformFeedFrameWriter) {
-            ((CollectTransformFeedFrameWriter) innerWriter).reset(this.writer);
-        } else {
-            inputSideHandler.setCoreOperator(writer);
-        }
-
-        inputSideHandler.setMode(Mode.PROCESS);
-    }
-
-    private void handlePartialConnection() throws Exception {
-        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COMPUTE_COLLECT, partition,
-                FeedRuntimeId.DEFAULT_OPERAND_ID);
-        writer.open();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Beginning new feed (from existing partial connection):" + connectionId);
-        }
-        IFeedOperatorOutputSideHandler wrapper = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime,
-                outputRecordDescriptor, connectionId);
-
-        inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, wrapper, policyAccessor, false,
-                new FrameTupleAccessor(recordDesc), recordDesc, feedManager, nPartitions);
-
-        collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, wrapper, sourceRuntime,
-                feedPolicy, ctx);
-        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
-        recordDesc = sourceRuntime.getRecordDescriptor();
-        sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
-    }
-
-    private void handleInterruptedException(InterruptedException ie) throws HyracksDataException {
-        if (policyAccessor.continueOnHardwareFailure()) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Continuing on failure as per feed policy, switching to " + Mode.STALL
-                        + " until failure is resolved");
-            }
-            inputSideHandler.setMode(Mode.STALL);
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Failure during feed ingestion. Deregistering feed runtime " + collectRuntime
-                        + " as feed is not configured to handle failures");
-            }
-            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
-            writer.close();
-            throw new HyracksDataException(ie);
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index db11caa..b1fd7a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -21,16 +21,10 @@ package org.apache.asterix.external.operators;
 import java.util.Map;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.feed.api.IFeed;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.IngestionRuntime;
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -99,27 +93,18 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        IFeedSubscriptionManager feedSubscriptionManager = ((IFeedManager) runtimeCtx.getFeedManager())
-                .getFeedSubscriptionManager();
-        SubscribableFeedRuntimeId feedIngestionId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
-                partition);
-        IngestionRuntime ingestionRuntime = (IngestionRuntime) feedSubscriptionManager
-                .getSubscribableRuntime(feedIngestionId);
         if (adaptorFactory == null) {
             try {
-                adaptorFactory = createExtenralAdapterFactory(ctx, partition);
+                adaptorFactory = createExternalAdapterFactory(ctx, partition);
             } catch (Exception exception) {
                 throw new HyracksDataException(exception);
             }
-
         }
-        return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, ingestionRuntime,
-                policyAccessor, recordDescProvider, this);
+        return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, policyAccessor,
+                recordDescProvider, this);
     }
 
-    private IAdapterFactory createExtenralAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception {
+    private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception {
         IAdapterFactory adapterFactory = null;
         ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
                 adaptorLibraryName);