You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:13 UTC

[19/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
deleted file mode 100644
index e5a22b5..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
+++ /dev/null
@@ -1,388 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.logging.Level;
-
-import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks.LogInputOutputRateTask;
-import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks.MonitorInputQueueLengthTimerTask;
-import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoreProcessRateTimerTask;
-import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.ValueType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback.FrameEvent;
-import org.apache.asterix.common.feeds.api.IFramePostProcessor;
-import org.apache.asterix.common.feeds.api.IFramePreprocessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public abstract class MonitoredBuffer extends MessageReceiver<DataBucket> {
-
-    protected static final long LOG_INPUT_OUTPUT_RATE_FREQUENCY = 5000; // 5 seconds
-    protected static final long INPUT_QUEUE_MEASURE_FREQUENCY = 1000; // 1 second
-    protected static final long PROCESSING_RATE_MEASURE_FREQUENCY = 10000; // 10 seconds
-
-    protected static final int PROCESS_RATE_REFRESH = 2; // refresh processing rate every 10th frame
-
-    protected final IHyracksTaskContext ctx;
-    protected final FeedConnectionId connectionId;
-    protected final FeedRuntimeId runtimeId;
-    protected final FrameTupleAccessor inflowFta;
-    protected final FrameTupleAccessor outflowFta;
-    protected final FeedRuntimeInputHandler inputHandler;
-    protected final IFrameEventCallback callback;
-    protected final Timer timer;
-    private final IExceptionHandler exceptionHandler;
-    protected final FeedPolicyAccessor policyAccessor;
-    protected int nPartitions;
-
-    private IFrameWriter frameWriter;
-    protected IFeedMetricCollector metricCollector;
-    protected boolean monitorProcessingRate = false;
-    protected boolean monitorInputQueueLength = false;
-    protected boolean logInflowOutflowRate = false;
-    protected boolean reportOutflowRate = false;
-    protected boolean reportInflowRate = false;
-
-    protected int inflowReportSenderId = -1;
-    protected int outflowReportSenderId = -1;
-    protected TimerTask monitorInputQueueLengthTask;
-    protected TimerTask processingRateTask;
-    protected TimerTask logInflowOutflowRateTask;
-    protected MonitoredBufferStorageTimerTask storageTimeTrackingRateTask;
-    protected StorageFrameHandler storageFromeHandler;
-
-    protected int processingRate = -1;
-    protected int frameCount = 0;
-    private long avgDelayPersistence = 0;
-    private boolean active;
-    private Map<Integer, Long> tupleTimeStats;
-    IFramePostProcessor postProcessor = null;
-    IFramePreprocessor preProcessor = null;
-
-    public static MonitoredBuffer getMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
-            IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
-            IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
-            FeedPolicyAccessor policyAccessor) {
-        switch (runtimeId.getFeedRuntimeType()) {
-            case COMPUTE:
-                return new ComputeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
-                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
-            case STORE:
-                return new StorageSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
-                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
-            case COLLECT:
-                return new IntakeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
-                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
-            default:
-                return new BasicMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
-                        connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
-        }
-    }
-
-    protected MonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
-            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
-            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
-            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
-        this.ctx = ctx;
-        this.connectionId = connectionId;
-        this.frameWriter = frameWriter;
-        this.inflowFta = new FrameTupleAccessor(recordDesc);
-        this.outflowFta = new FrameTupleAccessor(recordDesc);
-        this.runtimeId = runtimeId;
-        this.metricCollector = metricCollector;
-        this.exceptionHandler = exceptionHandler;
-        this.callback = callback;
-        this.inputHandler = inputHandler;
-        this.timer = new Timer();
-        this.policyAccessor = policyAccessor;
-        this.nPartitions = nPartitions;
-        this.active = true;
-        initializeMonitoring();
-    }
-
-    protected abstract boolean monitorProcessingRate();
-
-    protected abstract boolean logInflowOutflowRate();
-
-    protected abstract boolean reportOutflowRate();
-
-    protected abstract boolean reportInflowRate();
-
-    protected abstract boolean monitorInputQueueLength();
-
-    protected abstract IFramePreprocessor getFramePreProcessor();
-
-    protected abstract IFramePostProcessor getFramePostProcessor();
-
-    protected void initializeMonitoring() {
-        monitorProcessingRate = monitorProcessingRate();
-        monitorInputQueueLength = monitorInputQueueLength();
-        reportInflowRate = reportInflowRate();
-        reportOutflowRate = reportOutflowRate();
-        logInflowOutflowRate = policyAccessor.isLoggingStatisticsEnabled() || logInflowOutflowRate();
-
-        if (monitorProcessingRate && policyAccessor.isElastic()) { // check possibility to scale in
-            this.processingRateTask = new MonitoreProcessRateTimerTask(this, inputHandler.getFeedManager(),
-                    connectionId, nPartitions);
-            this.timer.scheduleAtFixedRate(processingRateTask, 0, PROCESSING_RATE_MEASURE_FREQUENCY);
-        }
-
-        if (monitorInputQueueLength && (policyAccessor.isElastic() || policyAccessor.throttlingEnabled()
-                || policyAccessor.spillToDiskOnCongestion() || policyAccessor.discardOnCongestion())) {
-            this.monitorInputQueueLengthTask = new MonitorInputQueueLengthTimerTask(this, callback);
-            this.timer.scheduleAtFixedRate(monitorInputQueueLengthTask, 0, INPUT_QUEUE_MEASURE_FREQUENCY);
-        }
-
-        if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
-            this.logInflowOutflowRateTask = new LogInputOutputRateTask(this, logInflowOutflowRate, reportInflowRate,
-                    reportOutflowRate);
-            this.timer.scheduleAtFixedRate(logInflowOutflowRateTask, 0, LOG_INPUT_OUTPUT_RATE_FREQUENCY);
-            this.inflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
-                    ValueType.INFLOW_RATE, MetricType.RATE);
-            this.outflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
-                    ValueType.OUTFLOW_RATE, MetricType.RATE);
-        }
-    }
-
-    protected void deinitializeMonitoring() {
-        if (monitorInputQueueLengthTask != null) {
-            monitorInputQueueLengthTask.cancel();
-        }
-        if (processingRateTask != null) {
-            processingRateTask.cancel();
-        }
-        if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
-            metricCollector.removeReportSender(inflowReportSenderId);
-            metricCollector.removeReportSender(outflowReportSenderId);
-            logInflowOutflowRateTask.cancel();
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Disabled monitoring for " + this.runtimeId);
-        }
-    }
-
-    protected void postProcessFrame(long startTime, ByteBuffer frame) throws Exception {
-        if (monitorProcessingRate) {
-            frameCount++;
-            if (frameCount % PROCESS_RATE_REFRESH == 0) {
-                long endTime = System.currentTimeMillis();
-                processingRate = (int) ((double) outflowFta.getTupleCount() * 1000 / (endTime - startTime));
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Processing Rate :" + processingRate + " tuples/sec");
-                }
-                frameCount = 0;
-            }
-        }
-
-        if (logInflowOutflowRate || reportOutflowRate) {
-            metricCollector.sendReport(outflowReportSenderId, outflowFta.getTupleCount());
-        }
-
-        postProcessFrame(frame);
-
-    }
-
-    protected void preProcessFrame(ByteBuffer frame) throws Exception {
-        if (postProcessor == null) {
-            preProcessor = getFramePreProcessor();
-        }
-        if (preProcessor != null) {
-            preProcessor.preProcess(frame);
-        }
-    }
-
-    protected void postProcessFrame(ByteBuffer frame) throws Exception {
-        if (postProcessor == null) {
-            postProcessor = getFramePostProcessor();
-        }
-        if (postProcessor != null) {
-            outflowFta.reset(frame);
-            postProcessor.postProcessFrame(frame, outflowFta);
-        }
-    }
-
-    @Override
-    public void sendMessage(DataBucket message) {
-        inbox.add(message);
-    }
-
-    public void sendReport(ByteBuffer frame) {
-        if ((logInflowOutflowRate || reportInflowRate) && !(inputHandler.getMode().equals(Mode.PROCESS_BACKLOG)
-                || inputHandler.getMode().equals(Mode.PROCESS_SPILL))) {
-            inflowFta.reset(frame);
-            metricCollector.sendReport(inflowReportSenderId, inflowFta.getTupleCount());
-        }
-    }
-
-    /** return rate in terms of tuples/sec **/
-    public int getInflowRate() {
-        return metricCollector.getMetric(inflowReportSenderId);
-    }
-
-    /** return rate in terms of tuples/sec **/
-    public int getOutflowRate() {
-        return metricCollector.getMetric(outflowReportSenderId);
-    }
-
-    /** return the number of pending frames from the input queue **/
-    public int getWorkSize() {
-        return inbox.size();
-    }
-
-    /** reset the number of partitions (cardinality) for the runtime **/
-    public void setNumberOfPartitions(int nPartitions) {
-        if (processingRateTask != null) {
-            int currentPartitions = ((MonitoreProcessRateTimerTask) processingRateTask).getNumberOfPartitions();
-            if (currentPartitions != nPartitions) {
-                ((MonitoreProcessRateTimerTask) processingRateTask).setNumberOfPartitions(nPartitions);
-            }
-        }
-    }
-
-    public FeedRuntimeInputHandler getInputHandler() {
-        return inputHandler;
-    }
-
-    public synchronized void close(boolean processPending, boolean disableMonitoring) {
-        super.close(processPending);
-        if (disableMonitoring) {
-            deinitializeMonitoring();
-        }
-        active = false;
-    }
-
-    @Override
-    public synchronized void processMessage(DataBucket message) throws Exception {
-        if (!active) {
-            message.doneReading();
-            return;
-        }
-        switch (message.getContentType()) {
-            case DATA:
-                boolean finishedProcessing = false;
-                ByteBuffer frameReceived = message.getContent();
-                ByteBuffer frameToProcess = null;
-                if (inputHandler.isThrottlingEnabled()) {
-                    inflowFta.reset(frameReceived);
-                    int pRate = getProcessingRate();
-                    int inflowRate = getInflowRate();
-                    if (inflowRate > pRate) {
-                        double retainFraction = (pRate * 0.8 / inflowRate);
-                        frameToProcess = throttleFrame(inflowFta, retainFraction);
-                        inflowFta.reset(frameToProcess);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Throttling at fraction " + retainFraction + "inflow rate " + inflowRate
-                                    + " no of tuples remaining " + inflowFta.getTupleCount());
-
-                        }
-                    } else {
-                        frameToProcess = frameReceived;
-                    }
-                } else {
-                    frameToProcess = frameReceived;
-                }
-                outflowFta.reset(frameToProcess);
-                long startTime = 0;
-                while (!finishedProcessing) {
-                    try {
-                        inflowFta.reset(frameToProcess);
-                        startTime = System.currentTimeMillis();
-                        preProcessFrame(frameToProcess);
-                        frameWriter.nextFrame(frameToProcess);
-                        postProcessFrame(startTime, frameToProcess);
-                        finishedProcessing = true;
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        frameToProcess = exceptionHandler.handleException(e, frameToProcess);
-                        finishedProcessing = true;
-                    }
-                }
-                message.doneReading();
-                break;
-            case EOD:
-                message.doneReading();
-                timer.cancel();
-                callback.frameEvent(FrameEvent.FINISHED_PROCESSING);
-                break;
-            case EOSD:
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Done processing spillage");
-                }
-                message.doneReading();
-                callback.frameEvent(FrameEvent.FINISHED_PROCESSING_SPILLAGE);
-                break;
-
-        }
-    }
-
-    private ByteBuffer throttleFrame(FrameTupleAccessor fta, double retainFraction) throws HyracksDataException {
-        int desiredTuples = (int) (fta.getTupleCount() * retainFraction);
-        return FeedFrameUtil.getSampledFrame(ctx, fta, desiredTuples);
-    }
-
-    public Mode getMode() {
-        return inputHandler.getMode();
-    }
-
-    public FeedRuntimeId getRuntimeId() {
-        return runtimeId;
-    }
-
-    public void setFrameWriter(IFrameWriter frameWriter) {
-        this.frameWriter = frameWriter;
-    }
-
-    public void reset() {
-        active = true;
-        if (logInflowOutflowRate) {
-            metricCollector.resetReportSender(inflowReportSenderId);
-            metricCollector.resetReportSender(outflowReportSenderId);
-        }
-    }
-
-    public int getProcessingRate() {
-        return processingRate;
-    }
-
-    public Map<Integer, Long> getTupleTimeStats() {
-        return tupleTimeStats;
-    }
-
-    public long getAvgDelayRecordPersistence() {
-        return avgDelayPersistence;
-    }
-
-    public MonitoredBufferStorageTimerTask getStorageTimeTrackingRateTask() {
-        return storageTimeTrackingRateTask;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBufferTimerTasks.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBufferTimerTasks.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBufferTimerTasks.java
deleted file mode 100644
index 3434b4f..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBufferTimerTasks.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedMessageService;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.ValueType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback.FrameEvent;
-import org.apache.asterix.common.feeds.message.FeedReportMessage;
-import org.apache.asterix.common.feeds.message.ScaleInReportMessage;
-import org.apache.asterix.common.feeds.message.StorageReportFeedMessage;
-
-public class MonitoredBufferTimerTasks {
-
-    private static final Logger LOGGER = Logger.getLogger(MonitorInputQueueLengthTimerTask.class.getName());
-
-    public static class MonitoredBufferStorageTimerTask extends TimerTask {
-
-        private static final int PERSISTENCE_DELAY_VIOLATION_MAX = 5;
-
-        private final StorageSideMonitoredBuffer mBuffer;
-        private final IFeedManager feedManager;
-        private final int partition;
-        private final FeedConnectionId connectionId;
-        private final FeedPolicyAccessor policyAccessor;
-        private final StorageFrameHandler storageFromeHandler;
-        private final StorageReportFeedMessage storageReportMessage;
-        private final FeedTupleCommitAckMessage tupleCommitAckMessage;
-
-        private Map<Integer, Integer> maxIntakeBaseCovered;
-        private int countDelayExceeded = 0;
-
-        public MonitoredBufferStorageTimerTask(StorageSideMonitoredBuffer mBuffer, IFeedManager feedManager,
-                FeedConnectionId connectionId, int partition, FeedPolicyAccessor policyAccessor,
-                StorageFrameHandler storageFromeHandler) {
-            this.mBuffer = mBuffer;
-            this.feedManager = feedManager;
-            this.connectionId = connectionId;
-            this.partition = partition;
-            this.policyAccessor = policyAccessor;
-            this.storageFromeHandler = storageFromeHandler;
-            this.storageReportMessage = new StorageReportFeedMessage(this.connectionId, this.partition, 0, false, 0, 0);
-            this.tupleCommitAckMessage = new FeedTupleCommitAckMessage(this.connectionId, 0, 0, null);
-            this.maxIntakeBaseCovered = new HashMap<Integer, Integer>();
-        }
-
-        @Override
-        public void run() {
-            if (mBuffer.isAckingEnabled() && !mBuffer.getInputHandler().isThrottlingEnabled()) {
-                ackRecords();
-            }
-            if (mBuffer.isTimeTrackingEnabled()) {
-                checkLatencyViolation();
-            }
-        }
-
-        private void ackRecords() {
-            Set<Integer> partitions = storageFromeHandler.getPartitionsWithStats();
-            List<Integer> basesCovered = new ArrayList<Integer>();
-            for (int intakePartition : partitions) {
-                Map<Integer, IntakePartitionStatistics> baseAcks = storageFromeHandler
-                        .getBaseAcksForPartition(intakePartition);
-                for (Entry<Integer, IntakePartitionStatistics> entry : baseAcks.entrySet()) {
-                    int base = entry.getKey();
-                    IntakePartitionStatistics stats = entry.getValue();
-                    Integer maxIntakeBaseForPartition = maxIntakeBaseCovered.get(intakePartition);
-                    if (maxIntakeBaseForPartition == null || maxIntakeBaseForPartition < base) {
-                        tupleCommitAckMessage.reset(intakePartition, base, stats.getAckInfo());
-                        feedManager.getFeedMessageService().sendMessage(tupleCommitAckMessage);
-                    } else {
-                        basesCovered.add(base);
-                    }
-                }
-                for (Integer b : basesCovered) {
-                    baseAcks.remove(b);
-                }
-                basesCovered.clear();
-            }
-        }
-
-        private void checkLatencyViolation() {
-            long avgDelayPersistence = storageFromeHandler.getAvgDelayPersistence();
-            if (avgDelayPersistence > policyAccessor.getMaxDelayRecordPersistence()) {
-                countDelayExceeded++;
-                if (countDelayExceeded > PERSISTENCE_DELAY_VIOLATION_MAX) {
-                    storageReportMessage.reset(0, false, mBuffer.getAvgDelayRecordPersistence());
-                    feedManager.getFeedMessageService().sendMessage(storageReportMessage);
-                }
-            } else {
-                countDelayExceeded = 0;
-            }
-        }
-
-        public void receiveCommitAckResponse(FeedTupleCommitResponseMessage message) {
-            maxIntakeBaseCovered.put(message.getIntakePartition(), message.getMaxWindowAcked());
-        }
-    }
-
-    public static class LogInputOutputRateTask extends TimerTask {
-
-        private final MonitoredBuffer mBuffer;
-        private final boolean log;
-        private final boolean reportInflow;
-        private final boolean reportOutflow;
-
-        private final IFeedMessageService messageService;
-        private final FeedReportMessage message;
-
-        public LogInputOutputRateTask(MonitoredBuffer mBuffer, boolean log, boolean reportInflow, boolean reportOutflow) {
-            this.mBuffer = mBuffer;
-            this.log = log;
-            this.reportInflow = reportInflow;
-            this.reportOutflow = reportOutflow;
-            if (reportInflow || reportOutflow) {
-                ValueType vType = reportInflow ? ValueType.INFLOW_RATE : ValueType.OUTFLOW_RATE;
-                messageService = mBuffer.getInputHandler().getFeedManager().getFeedMessageService();
-                message = new FeedReportMessage(mBuffer.getInputHandler().getConnectionId(), mBuffer.getRuntimeId(),
-                        vType, 0);
-            } else {
-                messageService = null;
-                message = null;
-            }
-
-        }
-
-        @Override
-        public void run() {
-            int pendingWork = mBuffer.getWorkSize();
-            int outflowRate = mBuffer.getOutflowRate();
-            int inflowRate = mBuffer.getInflowRate();
-            if (log) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info(mBuffer.getRuntimeId() + " " + "Inflow rate:" + inflowRate + " Outflow Rate:"
-                            + outflowRate + " Pending Work " + pendingWork);
-                }
-            }
-            if (reportInflow) {
-                message.reset(inflowRate);
-            } else if (reportOutflow) {
-                message.reset(outflowRate);
-            }
-            messageService.sendMessage(message);
-        }
-    }
-
-    public static class MonitorInputQueueLengthTimerTask extends TimerTask {
-
-        private final MonitoredBuffer mBuffer;
-        private final IFrameEventCallback callback;
-        private final int pendingWorkThreshold;
-        private final int maxSuccessiveThresholdPeriods;
-        private FrameEvent lastEvent = FrameEvent.NO_OP;
-        private int pendingWorkExceedCount = 0;
-
-        public MonitorInputQueueLengthTimerTask(MonitoredBuffer mBuffer, IFrameEventCallback callback) {
-            this.mBuffer = mBuffer;
-            this.callback = callback;
-            AsterixFeedProperties props = mBuffer.getInputHandler().getFeedManager().getAsterixFeedProperties();
-            pendingWorkThreshold = props.getPendingWorkThreshold();
-            maxSuccessiveThresholdPeriods = props.getMaxSuccessiveThresholdPeriod();
-        }
-
-        @Override
-        public void run() {
-            int pendingWork = mBuffer.getWorkSize();
-            if (mBuffer.getMode().equals(Mode.PROCESS_SPILL) || mBuffer.getMode().equals(Mode.PROCESS_BACKLOG)) {
-                return;
-            }
-
-            switch (lastEvent) {
-                case NO_OP:
-                case PENDING_WORK_DONE:
-                case FINISHED_PROCESSING_SPILLAGE:
-                    if (pendingWork > pendingWorkThreshold) {
-                        pendingWorkExceedCount++;
-                        if (pendingWorkExceedCount > maxSuccessiveThresholdPeriods) {
-                            pendingWorkExceedCount = 0;
-                            lastEvent = FrameEvent.PENDING_WORK_THRESHOLD_REACHED;
-                            callback.frameEvent(lastEvent);
-                        }
-                    } else if (pendingWork == 0 && mBuffer.getMode().equals(Mode.SPILL)) {
-                        lastEvent = FrameEvent.PENDING_WORK_DONE;
-                        callback.frameEvent(lastEvent);
-                    }
-                    break;
-                case PENDING_WORK_THRESHOLD_REACHED:
-                    if (((pendingWork * 1.0) / pendingWorkThreshold) <= 0.5) {
-                        lastEvent = FrameEvent.PENDING_WORK_DONE;
-                        callback.frameEvent(lastEvent);
-                    }
-                    break;
-                case FINISHED_PROCESSING:
-                    break;
-
-            }
-        }
-    }
-
-    /**
-     * A timer task to measure and compare the processing rate and inflow rate
-     * to look for possibility to scale-in, that is reduce the degree of cardinality
-     * of the compute operator.
-     */
-    public static class MonitoreProcessRateTimerTask extends TimerTask {
-
-        private final MonitoredBuffer mBuffer;
-        private final IFeedManager feedManager;
-        private int nPartitions;
-        private ScaleInReportMessage sMessage;
-        private boolean proposedChange;
-
-        public MonitoreProcessRateTimerTask(MonitoredBuffer mBuffer, IFeedManager feedManager,
-                FeedConnectionId connectionId, int nPartitions) {
-            this.mBuffer = mBuffer;
-            this.feedManager = feedManager;
-            this.nPartitions = nPartitions;
-            this.sMessage = new ScaleInReportMessage(connectionId, FeedRuntimeType.COMPUTE, 0, 0);
-            this.proposedChange = false;
-        }
-
-        public int getNumberOfPartitions() {
-            return nPartitions;
-        }
-
-        public void setNumberOfPartitions(int nPartitions) {
-            this.nPartitions = nPartitions;
-            proposedChange = false;
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Reset the number of partitions for " + mBuffer.getRuntimeId() + " to " + nPartitions);
-            }
-        }
-
-        @Override
-        public void run() {
-            if (!proposedChange) {
-                int inflowRate = mBuffer.getInflowRate();
-                int procRate = mBuffer.getProcessingRate();
-                if (inflowRate > 0 && procRate > 0) {
-                    if (inflowRate < procRate) {
-                        int possibleCardinality = (int) Math.ceil(nPartitions * inflowRate / (double) procRate);
-                        if (possibleCardinality < nPartitions
-                                && ((((nPartitions - possibleCardinality) * 1.0) / nPartitions) >= 0.25)) {
-                            sMessage.reset(nPartitions, possibleCardinality);
-                            feedManager.getFeedMessageService().sendMessage(sMessage);
-                            proposedChange = true;
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Proposed scale-in " + sMessage);
-                            }
-                        }
-                    } else {
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Inflow Rate (" + inflowRate + ") exceeds Processing Rate" + " (" + procRate
-                                    + ")");
-                        }
-                    }
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Waiting for earlier proposal to scale in to be applied");
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoad.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoad.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoad.java
deleted file mode 100644
index b654563..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoad.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-
-public class NodeLoad implements Comparable<NodeLoad> {
-
-    private final String nodeId;
-
-    private int nRuntimes;
-
-    public NodeLoad(String nodeId) {
-        this.nodeId = nodeId;
-        this.nRuntimes = 0;
-    }
-
-    public void addLoad() {
-        nRuntimes++;
-    }
-
-    public void removeLoad(FeedRuntimeType runtimeType) {
-        nRuntimes--;
-    }
-
-    @Override
-    public int compareTo(NodeLoad o) {
-        if (this == o) {
-            return 0;
-        }
-        return nRuntimes - o.getnRuntimes();
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public int getnRuntimes() {
-        return nRuntimes;
-    }
-
-    public void setnRuntimes(int nRuntimes) {
-        this.nRuntimes = nRuntimes;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReport.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReport.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReport.java
deleted file mode 100644
index a509341..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReport.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class NodeLoadReport implements Comparable<NodeLoadReport> {
-
-    private final String nodeId;
-    private float cpuLoad;
-    private double usedHeap;
-    private int nRuntimes;
-
-    public NodeLoadReport(String nodeId, float cpuLoad, float usedHeap, int nRuntimes) {
-        this.nodeId = nodeId;
-        this.cpuLoad = cpuLoad;
-        this.usedHeap = usedHeap;
-        this.nRuntimes = nRuntimes;
-    }
-
-    public static NodeLoadReport read(JSONObject obj) throws JSONException {
-        NodeLoadReport r = new NodeLoadReport(obj.getString(FeedConstants.MessageConstants.NODE_ID),
-                (float) obj.getDouble(FeedConstants.MessageConstants.CPU_LOAD),
-                (float) obj.getDouble(FeedConstants.MessageConstants.HEAP_USAGE),
-                obj.getInt(FeedConstants.MessageConstants.N_RUNTIMES));
-        return r;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof NodeLoadReport)) {
-            return false;
-        }
-        return ((NodeLoadReport) o).nodeId.equals(nodeId);
-    }
-
-    @Override
-    public int hashCode() {
-        return nodeId.hashCode();
-    }
-
-    @Override
-    public int compareTo(NodeLoadReport o) {
-        if (nRuntimes != o.getnRuntimes()) {
-            return nRuntimes - o.getnRuntimes();
-        } else {
-            return (int) (this.cpuLoad - ((NodeLoadReport) o).cpuLoad);
-        }
-    }
-
-    public float getCpuLoad() {
-        return cpuLoad;
-    }
-
-    public void setCpuLoad(float cpuLoad) {
-        this.cpuLoad = cpuLoad;
-    }
-
-    public double getUsedHeap() {
-        return usedHeap;
-    }
-
-    public void setUsedHeap(double usedHeap) {
-        this.usedHeap = usedHeap;
-    }
-
-    public int getnRuntimes() {
-        return nRuntimes;
-    }
-
-    public void setnRuntimes(int nRuntimes) {
-        this.nRuntimes = nRuntimes;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java
deleted file mode 100644
index 6be0211..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.OperatingSystemMXBean;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedMessageService;
-import org.apache.asterix.common.feeds.api.IFeedService;
-import org.apache.asterix.common.feeds.message.NodeReportMessage;
-
-public class NodeLoadReportService implements IFeedService {
-
-    private static final int NODE_LOAD_REPORT_FREQUENCY = 2000;
-    private static final float CPU_CHANGE_THRESHOLD = 0.2f;
-    private static final float HEAP_CHANGE_THRESHOLD = 0.4f;
-
-    private final NodeLoadReportTask task;
-    private final Timer timer;
-
-    public NodeLoadReportService(String nodeId, IFeedManager feedManager) {
-        this.task = new NodeLoadReportTask(nodeId, feedManager);
-        this.timer = new Timer();
-    }
-
-    @Override
-    public void start() throws Exception {
-        timer.schedule(task, 0, NODE_LOAD_REPORT_FREQUENCY);
-    }
-
-    @Override
-    public void stop() {
-        timer.cancel();
-    }
-
-    private static class NodeLoadReportTask extends TimerTask {
-
-        private final IFeedManager feedManager;
-        private final NodeReportMessage message;
-        private final IFeedMessageService messageService;
-
-        private static OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
-        private static MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
-
-        public NodeLoadReportTask(String nodeId, IFeedManager feedManager) {
-            this.feedManager = feedManager;
-            this.message = new NodeReportMessage(0.0f, 0L, 0);
-            this.messageService = feedManager.getFeedMessageService();
-        }
-
-        @Override
-        public void run() {
-            List<FeedRuntimeId> runtimeIds = feedManager.getFeedConnectionManager().getRegisteredRuntimes();
-            int nRuntimes = runtimeIds.size();
-            double cpuLoad = getCpuLoad();
-            double usedHeap = getUsedHeap();
-            if (sendMessage(nRuntimes, cpuLoad, usedHeap)) {
-                message.reset(cpuLoad, usedHeap, nRuntimes);
-                messageService.sendMessage(message);
-            }
-        }
-
-        private boolean sendMessage(int nRuntimes, double cpuLoad, double usedHeap) {
-            if (message == null) {
-                return true;
-            }
-
-            boolean changeInCpu = (Math.abs(cpuLoad - message.getCpuLoad())
-                    / message.getCpuLoad()) > CPU_CHANGE_THRESHOLD;
-            boolean changeInUsedHeap = (Math.abs(usedHeap - message.getUsedHeap())
-                    / message.getUsedHeap()) > HEAP_CHANGE_THRESHOLD;
-            boolean changeInRuntimeSize = nRuntimes != message.getnRuntimes();
-            return changeInCpu || changeInUsedHeap || changeInRuntimeSize;
-        }
-
-        private double getCpuLoad() {
-            return osBean.getSystemLoadAverage();
-        }
-
-        private double getUsedHeap() {
-            return ((double) memBean.getHeapMemoryUsage().getUsed()) / memBean.getHeapMemoryUsage().getMax();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/Series.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/Series.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/Series.java
deleted file mode 100644
index 6f438ad..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/Series.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
-
-public abstract class Series {
-
-    protected final MetricType type;
-    protected int runningSum;
-
-    public Series(MetricType type) {
-        this.type = type;
-    }
-
-    public abstract void addValue(int value);
-
-    public int getRunningSum() {
-        return runningSum;
-    }
-
-    public MetricType getType() {
-        return type;
-    }
-
-    public abstract void reset();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesAvg.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesAvg.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesAvg.java
deleted file mode 100644
index 6bfe925..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesAvg.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
-
-public class SeriesAvg extends Series {
-
-    private int count;
-
-    public SeriesAvg() {
-        super(MetricType.AVG);
-    }
-
-    public int getAvg() {
-        return runningSum / count;
-    }
-
-    public synchronized void addValue(int value) {
-        if (value < 0) {
-            return;
-        }
-        runningSum += value;
-        count++;
-    }
-    
-    public  void reset(){
-        count = 0;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesRate.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesRate.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesRate.java
deleted file mode 100644
index 9d0b0ea..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesRate.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
-
-public class SeriesRate extends Series {
-
-    private static final long REFRESH_MEASUREMENT = 5000; // 5 seconds
-
-    private int rate;
-    private Timer timer;
-    private RateComputingTask task;
-
-    public SeriesRate() {
-        super(MetricType.RATE);
-        begin();
-    }
-
-    public int getRate() {
-        return rate;
-    }
-
-    public synchronized void addValue(int value) {
-        if (value < 0) {
-            return;
-        }
-        runningSum += value;
-    }
-
-    public void begin() {
-        if (timer == null) {
-            timer = new Timer();
-            task = new RateComputingTask(this);
-            timer.scheduleAtFixedRate(task, 0, REFRESH_MEASUREMENT);
-        }
-    }
-
-    public void end() {
-        if (timer != null) {
-            timer.cancel();
-        }
-    }
-
-    public void reset() {
-        rate = 0;
-        if (task != null) {
-            task.reset();
-        }
-    }
-
-    private class RateComputingTask extends TimerTask {
-
-        private int lastMeasured = 0;
-        private final SeriesRate series;
-
-        public RateComputingTask(SeriesRate series) {
-            this.series = series;
-        }
-
-        @Override
-        public void run() {
-            int currentValue = series.getRunningSum();
-            rate = (int) (((currentValue - lastMeasured) * 1000) / REFRESH_MEASUREMENT);
-            lastMeasured = currentValue;
-        }
-
-        public void reset() {
-            lastMeasured = 0;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageFrameHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageFrameHandler.java
deleted file mode 100644
index 5b99b8c..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageFrameHandler.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.asterix.common.feeds.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class StorageFrameHandler {
-
-    private final Map<Integer, Map<Integer, IntakePartitionStatistics>> intakeStatistics;
-    private long avgDelayPersistence;
-
-    public StorageFrameHandler() {
-        intakeStatistics = new HashMap<Integer, Map<Integer, IntakePartitionStatistics>>();
-        avgDelayPersistence = 0L;
-    }
-
-    public synchronized void updateTrackingInformation(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
-        int nTuples = frameAccessor.getTupleCount();
-        long delay = 0;
-        long intakeTimestamp;
-        long currentTime = System.currentTimeMillis();
-        int partition = 0;
-        int recordId = 0;
-        for (int i = 0; i < nTuples; i++) {
-            int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
-            int openPartOffsetOrig = frame.getInt(recordStart + 6);
-            int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
-
-            int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
-                    + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
-            recordId = frame.getInt(recordStart + recordIdOffset);
-
-            int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2) + 1;
-            partition = frame.getInt(recordStart + partitionOffset);
-
-            ackRecordId(partition, recordId);
-            int intakeTimestampValueOffset = partitionOffset + 4 + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2)
-                    + 1;
-            intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
-
-            int storeTimestampValueOffset = intakeTimestampValueOffset + 8
-                    + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
-            frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
-            delay += currentTime - intakeTimestamp;
-        }
-        avgDelayPersistence = delay / nTuples;
-    }
-
-    private void ackRecordId(int partition, int recordId) {
-        Map<Integer, IntakePartitionStatistics> map = intakeStatistics.get(partition);
-        if (map == null) {
-            map = new HashMap<Integer, IntakePartitionStatistics>();
-            intakeStatistics.put(partition, map);
-        }
-        int base = (int) Math.ceil(recordId * 1.0 / IntakePartitionStatistics.ACK_WINDOW_SIZE);
-        IntakePartitionStatistics intakeStatsForBaseOfPartition = map.get(base);
-        if (intakeStatsForBaseOfPartition == null) {
-            intakeStatsForBaseOfPartition = new IntakePartitionStatistics(partition, base);
-            map.put(base, intakeStatsForBaseOfPartition);
-        }
-        intakeStatsForBaseOfPartition.ackRecordId(recordId);
-    }
-
-    public byte[] getAckData(int partition, int base) {
-        Map<Integer, IntakePartitionStatistics> intakeStats = intakeStatistics.get(partition);
-        if (intakeStats != null) {
-            IntakePartitionStatistics intakePartitionStats = intakeStats.get(base);
-            if (intakePartitionStats != null) {
-                return intakePartitionStats.getAckInfo();
-            }
-        }
-        return null;
-    }
-
-    public synchronized Map<Integer, IntakePartitionStatistics> getBaseAcksForPartition(int partition) {
-        Map<Integer, IntakePartitionStatistics> intakeStatsForPartition = intakeStatistics.get(partition);
-        Map<Integer, IntakePartitionStatistics> clone = new HashMap<Integer, IntakePartitionStatistics>();
-        for (Entry<Integer, IntakePartitionStatistics> entry : intakeStatsForPartition.entrySet()) {
-            clone.put(entry.getKey(), entry.getValue());
-        }
-        return intakeStatsForPartition;
-    }
-
-    public long getAvgDelayPersistence() {
-        return avgDelayPersistence;
-    }
-
-    public void setAvgDelayPersistence(long avgDelayPersistence) {
-        this.avgDelayPersistence = avgDelayPersistence;
-    }
-
-    public Set<Integer> getPartitionsWithStats() {
-        return intakeStatistics.keySet();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java
deleted file mode 100644
index 4027237..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.feeds.FeedConstants.StatisticsConstants;
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFramePostProcessor;
-import org.apache.asterix.common.feeds.api.IFramePreprocessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class StorageSideMonitoredBuffer extends MonitoredBuffer {
-
-    private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000; // 10
-                                                                      // seconds
-
-    private boolean ackingEnabled;
-    private final boolean timeTrackingEnabled;
-
-    public StorageSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
-            IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
-            IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
-            FeedPolicyAccessor policyAccessor) {
-        super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
-                exceptionHandler, callback, nPartitions, policyAccessor);
-        timeTrackingEnabled = policyAccessor.isTimeTrackingEnabled();
-        ackingEnabled = policyAccessor.atleastOnceSemantics();
-        if (ackingEnabled || timeTrackingEnabled) {
-            storageFromeHandler = new StorageFrameHandler();
-            this.storageTimeTrackingRateTask = new MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask(this,
-                    inputHandler.getFeedManager(), connectionId, runtimeId.getPartition(), policyAccessor,
-                    storageFromeHandler);
-            this.timer.scheduleAtFixedRate(storageTimeTrackingRateTask, 0, STORAGE_TIME_TRACKING_FREQUENCY);
-        }
-    }
-
-    @Override
-    protected boolean monitorProcessingRate() {
-        return false;
-    }
-
-    @Override
-    protected boolean logInflowOutflowRate() {
-        return true;
-    }
-
-    @Override
-    public IFramePreprocessor getFramePreProcessor() {
-        return new IFramePreprocessor() {
-
-            @Override
-            public void preProcess(ByteBuffer frame) {
-                try {
-                    if (ackingEnabled) {
-                        storageFromeHandler.updateTrackingInformation(frame, inflowFta);
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        };
-    }
-
-    @Override
-    protected IFramePostProcessor getFramePostProcessor() {
-        return new IFramePostProcessor() {
-
-            private static final long NORMAL_WINDOW_LIMIT = 400 * 1000;
-            private static final long HIGH_WINDOW_LIMIT = 800 * 1000;
-
-            private long delayNormalWindow = 0;
-            private long delayHighWindow = 0;
-            private long delayLowWindow = 0;
-
-            private int countNormalWindow;
-            private int countHighWindow;
-            private int countLowWindow;
-
-            private long beginIntakeTimestamp = 0;
-
-            @Override
-            public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
-                if (ackingEnabled || timeTrackingEnabled) {
-                    int nTuples = frameAccessor.getTupleCount();
-                    long intakeTimestamp;
-                    long currentTime = System.currentTimeMillis();
-                    for (int i = 0; i < nTuples; i++) {
-                        int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
-                        int openPartOffsetOrig = frame.getInt(recordStart + 6);
-                        int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
-
-                        int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
-                                + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
-
-                        int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2)
-                                + 1;
-
-                        int intakeTimestampValueOffset = partitionOffset + 4
-                                + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2) + 1;
-                        intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
-                        if (beginIntakeTimestamp == 0) {
-                            beginIntakeTimestamp = intakeTimestamp;
-                            LOGGER.warning("Begin Timestamp: " + beginIntakeTimestamp);
-                        }
-
-                        updateRunningAvg(intakeTimestamp, currentTime);
-
-                        int storeTimestampValueOffset = intakeTimestampValueOffset + 8
-                                + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
-                        frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
-                    }
-                    logRunningAvg();
-                    resetRunningAvg();
-                }
-            }
-
-            private void updateRunningAvg(long intakeTimestamp, long currentTime) {
-                long diffTimestamp = intakeTimestamp - beginIntakeTimestamp;
-                long delay = (currentTime - intakeTimestamp);
-                if (diffTimestamp < NORMAL_WINDOW_LIMIT) {
-                    delayNormalWindow += delay;
-                    countNormalWindow++;
-                } else if (diffTimestamp < HIGH_WINDOW_LIMIT) {
-                    delayHighWindow += delay;
-                    countHighWindow++;
-                } else {
-                    delayLowWindow += delay;
-                    countLowWindow++;
-                }
-            }
-
-            private void resetRunningAvg() {
-                delayNormalWindow = 0;
-                countNormalWindow = 0;
-                delayHighWindow = 0;
-                countHighWindow = 0;
-                delayLowWindow = 0;
-                countLowWindow = 0;
-            }
-
-            private void logRunningAvg() {
-                if (countNormalWindow != 0 && delayNormalWindow != 0) {
-                    LOGGER.warning("Window:" + 0 + ":" + "Avg Travel_Time:" + (delayNormalWindow / countNormalWindow));
-                }
-                if (countHighWindow != 0 && delayHighWindow != 0) {
-                    LOGGER.warning("Window:" + 1 + ":" + "Avg Travel_Time:" + (delayHighWindow / countHighWindow));
-                }
-                if (countLowWindow != 0 && delayLowWindow != 0) {
-                    LOGGER.warning("Window:" + 2 + ":" + "Avg Travel_Time:" + (delayLowWindow / countLowWindow));
-                }
-            }
-
-        };
-    }
-
-    public boolean isAckingEnabled() {
-        return ackingEnabled;
-    }
-
-    public void setAcking(boolean ackingEnabled) {
-        this.ackingEnabled = ackingEnabled;
-    }
-
-    public boolean isTimeTrackingEnabled() {
-        return timeTrackingEnabled;
-    }
-
-    @Override
-    protected boolean monitorInputQueueLength() {
-        return true;
-    }
-
-    @Override
-    protected boolean reportOutflowRate() {
-        return true;
-    }
-
-    @Override
-    protected boolean reportInflowRate() {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java
deleted file mode 100644
index 7eb5921..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-
-public class SubscribableFeedRuntimeId extends FeedRuntimeId {
-    private static final long serialVersionUID = 1L;
-    private final FeedId feedId;
-
-    public SubscribableFeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition) {
-        super(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
-        this.feedId = feedId;
-    }
-
-    public FeedId getFeedId() {
-        return feedId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof SubscribableFeedRuntimeId)) {
-            return false;
-        }
-
-        return (super.equals(o) && this.feedId.equals(((SubscribableFeedRuntimeId) o).getFeedId()));
-    }
-
-    @Override
-    public int hashCode() {
-        return super.hashCode() + feedId.hashCode();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableRuntime.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableRuntime.java
deleted file mode 100644
index b09b3ff..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableRuntime.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.asterix.common.feeds.api.ISubscriberRuntime;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class SubscribableRuntime extends FeedRuntime implements ISubscribableRuntime {
-
-    protected static final Logger LOGGER = Logger.getLogger(SubscribableRuntime.class.getName());
-
-    protected final FeedId feedId;
-    protected final List<ISubscriberRuntime> subscribers;
-    protected final RecordDescriptor recordDescriptor;
-    protected final DistributeFeedFrameWriter dWriter;
-
-    public SubscribableRuntime(FeedId feedId, FeedRuntimeId runtimeId, FeedRuntimeInputHandler inputHandler,
-            DistributeFeedFrameWriter dWriter, RecordDescriptor recordDescriptor) {
-        super(runtimeId, inputHandler, dWriter);
-        this.feedId = feedId;
-        this.recordDescriptor = recordDescriptor;
-        this.dWriter = dWriter;
-        this.subscribers = new ArrayList<ISubscriberRuntime>();
-    }
-
-    public FeedId getFeedId() {
-        return feedId;
-    }
-
-    @Override
-    public String toString() {
-        return "SubscribableRuntime" + " [" + feedId + "]" + "(" + runtimeId + ")";
-    }
-
-    @Override
-    public synchronized void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime)
-            throws Exception {
-        FeedFrameCollector collector = dWriter.subscribeFeed(new FeedPolicyAccessor(collectionRuntime.getFeedPolicy()),
-                collectionRuntime.getInputHandler(), collectionRuntime.getConnectionId());
-        collectionRuntime.setFrameCollector(collector);
-        subscribers.add(collectionRuntime);
-    }
-
-    @Override
-    public synchronized void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
-        dWriter.unsubscribeFeed(collectionRuntime.getFeedFrameWriter());
-        subscribers.remove(collectionRuntime);
-    }
-
-    @Override
-    public synchronized List<ISubscriberRuntime> getSubscribers() {
-        return subscribers;
-    }
-
-    @Override
-    public DistributeFeedFrameWriter getFeedFrameWriter() {
-        return dWriter;
-    }
-
-    public FeedRuntimeType getFeedRuntimeType() {
-        return runtimeId.getFeedRuntimeType();
-    }
-
-    @Override
-    public RecordDescriptor getRecordDescriptor() {
-        return recordDescriptor;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IAdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IAdapterRuntimeManager.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IAdapterRuntimeManager.java
deleted file mode 100644
index 2eb6caa..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IAdapterRuntimeManager.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import org.apache.asterix.common.feeds.FeedId;
-
-public interface IAdapterRuntimeManager {
-
-    public enum State {
-        /**
-         * Indicates that AsterixDB is maintaining the flow of data from external source into its storage.
-         */
-        ACTIVE_INGESTION,
-
-        /**
-         * Indicates that data from external source is being buffered and not
-         * pushed downstream
-         */
-
-        INACTIVE_INGESTION,
-        /**
-         * Indicates that feed ingestion activity has finished.
-         */
-        FINISHED_INGESTION,
-
-        /** Indicates the occurrence of a failure during the intake stage of a data ingestion pipeline **/
-        FAILED_INGESTION
-    }
-
-    /**
-     * Start feed ingestion
-     * @throws Exception
-     */
-    public void start() throws Exception;
-
-    /**
-     * Stop feed ingestion.
-     * @throws Exception
-     */
-    public void stop() throws Exception;
-
-    /**
-     * @return feedId associated with the feed that is being ingested
-     */
-    public FeedId getFeedId();
-
-    /**
-     * @return the instance of the feed adapter (an implementation of {@code IFeedAdapter}) in use.
-     */
-    public IDataSourceAdapter getFeedAdapter();
-
-    /**
-     * @return state associated with the AdapterRuntimeManager. See {@code State}.
-     */
-    public State getState();
-
-    /**
-     * @param state
-     */
-    public void setState(State state);
-
-    public IIntakeProgressTracker getProgressTracker();
-
-    public int getPartition();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ICentralFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ICentralFeedManager.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ICentralFeedManager.java
deleted file mode 100644
index 0092e6b..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ICentralFeedManager.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import java.io.IOException;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-
-public interface ICentralFeedManager {
-
-    public void start() throws AsterixException;
-
-    public void stop() throws AsterixException, IOException;
-
-    public IFeedTrackingManager getFeedTrackingManager();
-
-    public IFeedLoadManager getFeedLoadManager();
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java
deleted file mode 100644
index 9dd4e76..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-
-/**
- * A super interface implemented by a data source adapter. An adapter can be a
- * pull based or push based. This interface provides all common APIs that need
- * to be implemented by each adapter irrespective of the the kind of
- * adapter(pull or push).
- */
-public interface IDataSourceAdapter extends Serializable {
-
-    /**
-     * Triggers the adapter to begin ingesting data from the external source.
-     * 
-     * @param partition
-     *            The adapter could be running with a degree of parallelism.
-     *            partition corresponds to the i'th parallel instance.
-     * @param writer
-     *            The instance of frame writer that is used by the adapter to
-     *            write frame to. Adapter packs the fetched bytes (from external source),
-     *            packs them into frames and forwards the frames to an upstream receiving
-     *            operator using the instance of IFrameWriter.
-     * @throws Exception
-     */
-    public void start(int partition, IFrameWriter writer) throws Exception;
-
-    /**
-     * Discontinue the ingestion of data.
-     *
-     * @throws Exception
-     */
-    public boolean stop() throws Exception;
-
-    /**
-     * @param e
-     * @return true if the ingestion should continue post the exception else false
-     */
-    public boolean handleException(Throwable e);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IExceptionHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IExceptionHandler.java
deleted file mode 100644
index db2c890..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IExceptionHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Handles an exception encountered during processing of a data frame.
- * In the case when the exception is of type {@code FrameDataException}, the causing
- * tuple is logged and a new frame with tuple after the exception-generating tuple
- * is returned. This funcitonality is used during feed ingestion to bypass an exception
- * generating tuple and thus avoid the data flow from terminating
- */
-public interface IExceptionHandler {
-
-    /**
-     * @param e
-     *            the exception that needs to be handled
-     * @param frame
-     *            the frame that was being processed when exception occurred
-     * @return returns a new frame with tuples after the exception generating tuple
-     * @throws HyracksDataException
-     */
-    public ByteBuffer handleException(Exception e, ByteBuffer frame);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedConnectionManager.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedConnectionManager.java
deleted file mode 100644
index 014a868..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedConnectionManager.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeManager;
-
-/**
- * Handle (de)registration of feeds for delivery of control messages.
- */
-public interface IFeedConnectionManager {
-
-    /**
-     * Allows registration of a feedRuntime.
-     * 
-     * @param feedRuntime
-     * @throws Exception
-     */
-    public void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime) throws Exception;
-
-    /**
-     * Obtain feed runtime corresponding to a feedRuntimeId
-     * 
-     * @param feedRuntimeId
-     * @return
-     */
-    public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId);
-
-    /**
-     * De-register a feed
-     * 
-     * @param feedConnection
-     * @throws IOException
-     */
-    void deregisterFeed(FeedConnectionId feedConnection);
-
-    /**
-     * Obtain the feed runtime manager associated with a feed.
-     * 
-     * @param feedConnection
-     * @return
-     */
-    public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedConnection);
-
-    /**
-     * Allows de-registration of a feed runtime.
-     * 
-     * @param feedRuntimeId
-     */
-    void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId);
-
-    public List<FeedRuntimeId> getRegisteredRuntimes();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedFrameHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedFrameHandler.java
deleted file mode 100644
index 3a95e51..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedFrameHandler.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.asterix.common.feeds.DataBucket;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFeedFrameHandler {
-
-    public void handleFrame(ByteBuffer frame) throws HyracksDataException;
-
-    public void handleDataBucket(DataBucket bucket);
-
-    public void close();
-
-    public Iterator<ByteBuffer> replayData() throws HyracksDataException;
-
-    public String getSummary();
-
-}