You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:13 UTC
[19/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
deleted file mode 100644
index e5a22b5..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBuffer.java
+++ /dev/null
@@ -1,388 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.logging.Level;
-
-import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks.LogInputOutputRateTask;
-import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks.MonitorInputQueueLengthTimerTask;
-import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoreProcessRateTimerTask;
-import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.ValueType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback.FrameEvent;
-import org.apache.asterix.common.feeds.api.IFramePostProcessor;
-import org.apache.asterix.common.feeds.api.IFramePreprocessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public abstract class MonitoredBuffer extends MessageReceiver<DataBucket> {
-
- protected static final long LOG_INPUT_OUTPUT_RATE_FREQUENCY = 5000; // 5 seconds
- protected static final long INPUT_QUEUE_MEASURE_FREQUENCY = 1000; // 1 second
- protected static final long PROCESSING_RATE_MEASURE_FREQUENCY = 10000; // 10 seconds
-
- protected static final int PROCESS_RATE_REFRESH = 2; // refresh processing rate every 10th frame
-
- protected final IHyracksTaskContext ctx;
- protected final FeedConnectionId connectionId;
- protected final FeedRuntimeId runtimeId;
- protected final FrameTupleAccessor inflowFta;
- protected final FrameTupleAccessor outflowFta;
- protected final FeedRuntimeInputHandler inputHandler;
- protected final IFrameEventCallback callback;
- protected final Timer timer;
- private final IExceptionHandler exceptionHandler;
- protected final FeedPolicyAccessor policyAccessor;
- protected int nPartitions;
-
- private IFrameWriter frameWriter;
- protected IFeedMetricCollector metricCollector;
- protected boolean monitorProcessingRate = false;
- protected boolean monitorInputQueueLength = false;
- protected boolean logInflowOutflowRate = false;
- protected boolean reportOutflowRate = false;
- protected boolean reportInflowRate = false;
-
- protected int inflowReportSenderId = -1;
- protected int outflowReportSenderId = -1;
- protected TimerTask monitorInputQueueLengthTask;
- protected TimerTask processingRateTask;
- protected TimerTask logInflowOutflowRateTask;
- protected MonitoredBufferStorageTimerTask storageTimeTrackingRateTask;
- protected StorageFrameHandler storageFromeHandler;
-
- protected int processingRate = -1;
- protected int frameCount = 0;
- private long avgDelayPersistence = 0;
- private boolean active;
- private Map<Integer, Long> tupleTimeStats;
- IFramePostProcessor postProcessor = null;
- IFramePreprocessor preProcessor = null;
-
- public static MonitoredBuffer getMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
- IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
- IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
- FeedPolicyAccessor policyAccessor) {
- switch (runtimeId.getFeedRuntimeType()) {
- case COMPUTE:
- return new ComputeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
- connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
- case STORE:
- return new StorageSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
- connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
- case COLLECT:
- return new IntakeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
- connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
- default:
- return new BasicMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
- connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
- }
- }
-
- protected MonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
- FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
- FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
- IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
- this.ctx = ctx;
- this.connectionId = connectionId;
- this.frameWriter = frameWriter;
- this.inflowFta = new FrameTupleAccessor(recordDesc);
- this.outflowFta = new FrameTupleAccessor(recordDesc);
- this.runtimeId = runtimeId;
- this.metricCollector = metricCollector;
- this.exceptionHandler = exceptionHandler;
- this.callback = callback;
- this.inputHandler = inputHandler;
- this.timer = new Timer();
- this.policyAccessor = policyAccessor;
- this.nPartitions = nPartitions;
- this.active = true;
- initializeMonitoring();
- }
-
- protected abstract boolean monitorProcessingRate();
-
- protected abstract boolean logInflowOutflowRate();
-
- protected abstract boolean reportOutflowRate();
-
- protected abstract boolean reportInflowRate();
-
- protected abstract boolean monitorInputQueueLength();
-
- protected abstract IFramePreprocessor getFramePreProcessor();
-
- protected abstract IFramePostProcessor getFramePostProcessor();
-
- protected void initializeMonitoring() {
- monitorProcessingRate = monitorProcessingRate();
- monitorInputQueueLength = monitorInputQueueLength();
- reportInflowRate = reportInflowRate();
- reportOutflowRate = reportOutflowRate();
- logInflowOutflowRate = policyAccessor.isLoggingStatisticsEnabled() || logInflowOutflowRate();
-
- if (monitorProcessingRate && policyAccessor.isElastic()) { // check possibility to scale in
- this.processingRateTask = new MonitoreProcessRateTimerTask(this, inputHandler.getFeedManager(),
- connectionId, nPartitions);
- this.timer.scheduleAtFixedRate(processingRateTask, 0, PROCESSING_RATE_MEASURE_FREQUENCY);
- }
-
- if (monitorInputQueueLength && (policyAccessor.isElastic() || policyAccessor.throttlingEnabled()
- || policyAccessor.spillToDiskOnCongestion() || policyAccessor.discardOnCongestion())) {
- this.monitorInputQueueLengthTask = new MonitorInputQueueLengthTimerTask(this, callback);
- this.timer.scheduleAtFixedRate(monitorInputQueueLengthTask, 0, INPUT_QUEUE_MEASURE_FREQUENCY);
- }
-
- if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
- this.logInflowOutflowRateTask = new LogInputOutputRateTask(this, logInflowOutflowRate, reportInflowRate,
- reportOutflowRate);
- this.timer.scheduleAtFixedRate(logInflowOutflowRateTask, 0, LOG_INPUT_OUTPUT_RATE_FREQUENCY);
- this.inflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
- ValueType.INFLOW_RATE, MetricType.RATE);
- this.outflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
- ValueType.OUTFLOW_RATE, MetricType.RATE);
- }
- }
-
- protected void deinitializeMonitoring() {
- if (monitorInputQueueLengthTask != null) {
- monitorInputQueueLengthTask.cancel();
- }
- if (processingRateTask != null) {
- processingRateTask.cancel();
- }
- if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
- metricCollector.removeReportSender(inflowReportSenderId);
- metricCollector.removeReportSender(outflowReportSenderId);
- logInflowOutflowRateTask.cancel();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Disabled monitoring for " + this.runtimeId);
- }
- }
-
- protected void postProcessFrame(long startTime, ByteBuffer frame) throws Exception {
- if (monitorProcessingRate) {
- frameCount++;
- if (frameCount % PROCESS_RATE_REFRESH == 0) {
- long endTime = System.currentTimeMillis();
- processingRate = (int) ((double) outflowFta.getTupleCount() * 1000 / (endTime - startTime));
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Processing Rate :" + processingRate + " tuples/sec");
- }
- frameCount = 0;
- }
- }
-
- if (logInflowOutflowRate || reportOutflowRate) {
- metricCollector.sendReport(outflowReportSenderId, outflowFta.getTupleCount());
- }
-
- postProcessFrame(frame);
-
- }
-
- protected void preProcessFrame(ByteBuffer frame) throws Exception {
- if (postProcessor == null) {
- preProcessor = getFramePreProcessor();
- }
- if (preProcessor != null) {
- preProcessor.preProcess(frame);
- }
- }
-
- protected void postProcessFrame(ByteBuffer frame) throws Exception {
- if (postProcessor == null) {
- postProcessor = getFramePostProcessor();
- }
- if (postProcessor != null) {
- outflowFta.reset(frame);
- postProcessor.postProcessFrame(frame, outflowFta);
- }
- }
-
- @Override
- public void sendMessage(DataBucket message) {
- inbox.add(message);
- }
-
- public void sendReport(ByteBuffer frame) {
- if ((logInflowOutflowRate || reportInflowRate) && !(inputHandler.getMode().equals(Mode.PROCESS_BACKLOG)
- || inputHandler.getMode().equals(Mode.PROCESS_SPILL))) {
- inflowFta.reset(frame);
- metricCollector.sendReport(inflowReportSenderId, inflowFta.getTupleCount());
- }
- }
-
- /** return rate in terms of tuples/sec **/
- public int getInflowRate() {
- return metricCollector.getMetric(inflowReportSenderId);
- }
-
- /** return rate in terms of tuples/sec **/
- public int getOutflowRate() {
- return metricCollector.getMetric(outflowReportSenderId);
- }
-
- /** return the number of pending frames from the input queue **/
- public int getWorkSize() {
- return inbox.size();
- }
-
- /** reset the number of partitions (cardinality) for the runtime **/
- public void setNumberOfPartitions(int nPartitions) {
- if (processingRateTask != null) {
- int currentPartitions = ((MonitoreProcessRateTimerTask) processingRateTask).getNumberOfPartitions();
- if (currentPartitions != nPartitions) {
- ((MonitoreProcessRateTimerTask) processingRateTask).setNumberOfPartitions(nPartitions);
- }
- }
- }
-
- public FeedRuntimeInputHandler getInputHandler() {
- return inputHandler;
- }
-
- public synchronized void close(boolean processPending, boolean disableMonitoring) {
- super.close(processPending);
- if (disableMonitoring) {
- deinitializeMonitoring();
- }
- active = false;
- }
-
- @Override
- public synchronized void processMessage(DataBucket message) throws Exception {
- if (!active) {
- message.doneReading();
- return;
- }
- switch (message.getContentType()) {
- case DATA:
- boolean finishedProcessing = false;
- ByteBuffer frameReceived = message.getContent();
- ByteBuffer frameToProcess = null;
- if (inputHandler.isThrottlingEnabled()) {
- inflowFta.reset(frameReceived);
- int pRate = getProcessingRate();
- int inflowRate = getInflowRate();
- if (inflowRate > pRate) {
- double retainFraction = (pRate * 0.8 / inflowRate);
- frameToProcess = throttleFrame(inflowFta, retainFraction);
- inflowFta.reset(frameToProcess);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Throttling at fraction " + retainFraction + "inflow rate " + inflowRate
- + " no of tuples remaining " + inflowFta.getTupleCount());
-
- }
- } else {
- frameToProcess = frameReceived;
- }
- } else {
- frameToProcess = frameReceived;
- }
- outflowFta.reset(frameToProcess);
- long startTime = 0;
- while (!finishedProcessing) {
- try {
- inflowFta.reset(frameToProcess);
- startTime = System.currentTimeMillis();
- preProcessFrame(frameToProcess);
- frameWriter.nextFrame(frameToProcess);
- postProcessFrame(startTime, frameToProcess);
- finishedProcessing = true;
- } catch (Exception e) {
- e.printStackTrace();
- frameToProcess = exceptionHandler.handleException(e, frameToProcess);
- finishedProcessing = true;
- }
- }
- message.doneReading();
- break;
- case EOD:
- message.doneReading();
- timer.cancel();
- callback.frameEvent(FrameEvent.FINISHED_PROCESSING);
- break;
- case EOSD:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Done processing spillage");
- }
- message.doneReading();
- callback.frameEvent(FrameEvent.FINISHED_PROCESSING_SPILLAGE);
- break;
-
- }
- }
-
- private ByteBuffer throttleFrame(FrameTupleAccessor fta, double retainFraction) throws HyracksDataException {
- int desiredTuples = (int) (fta.getTupleCount() * retainFraction);
- return FeedFrameUtil.getSampledFrame(ctx, fta, desiredTuples);
- }
-
- public Mode getMode() {
- return inputHandler.getMode();
- }
-
- public FeedRuntimeId getRuntimeId() {
- return runtimeId;
- }
-
- public void setFrameWriter(IFrameWriter frameWriter) {
- this.frameWriter = frameWriter;
- }
-
- public void reset() {
- active = true;
- if (logInflowOutflowRate) {
- metricCollector.resetReportSender(inflowReportSenderId);
- metricCollector.resetReportSender(outflowReportSenderId);
- }
- }
-
- public int getProcessingRate() {
- return processingRate;
- }
-
- public Map<Integer, Long> getTupleTimeStats() {
- return tupleTimeStats;
- }
-
- public long getAvgDelayRecordPersistence() {
- return avgDelayPersistence;
- }
-
- public MonitoredBufferStorageTimerTask getStorageTimeTrackingRateTask() {
- return storageTimeTrackingRateTask;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBufferTimerTasks.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBufferTimerTasks.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBufferTimerTasks.java
deleted file mode 100644
index 3434b4f..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MonitoredBufferTimerTasks.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedMessageService;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.ValueType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback.FrameEvent;
-import org.apache.asterix.common.feeds.message.FeedReportMessage;
-import org.apache.asterix.common.feeds.message.ScaleInReportMessage;
-import org.apache.asterix.common.feeds.message.StorageReportFeedMessage;
-
-public class MonitoredBufferTimerTasks {
-
- private static final Logger LOGGER = Logger.getLogger(MonitorInputQueueLengthTimerTask.class.getName());
-
- public static class MonitoredBufferStorageTimerTask extends TimerTask {
-
- private static final int PERSISTENCE_DELAY_VIOLATION_MAX = 5;
-
- private final StorageSideMonitoredBuffer mBuffer;
- private final IFeedManager feedManager;
- private final int partition;
- private final FeedConnectionId connectionId;
- private final FeedPolicyAccessor policyAccessor;
- private final StorageFrameHandler storageFromeHandler;
- private final StorageReportFeedMessage storageReportMessage;
- private final FeedTupleCommitAckMessage tupleCommitAckMessage;
-
- private Map<Integer, Integer> maxIntakeBaseCovered;
- private int countDelayExceeded = 0;
-
- public MonitoredBufferStorageTimerTask(StorageSideMonitoredBuffer mBuffer, IFeedManager feedManager,
- FeedConnectionId connectionId, int partition, FeedPolicyAccessor policyAccessor,
- StorageFrameHandler storageFromeHandler) {
- this.mBuffer = mBuffer;
- this.feedManager = feedManager;
- this.connectionId = connectionId;
- this.partition = partition;
- this.policyAccessor = policyAccessor;
- this.storageFromeHandler = storageFromeHandler;
- this.storageReportMessage = new StorageReportFeedMessage(this.connectionId, this.partition, 0, false, 0, 0);
- this.tupleCommitAckMessage = new FeedTupleCommitAckMessage(this.connectionId, 0, 0, null);
- this.maxIntakeBaseCovered = new HashMap<Integer, Integer>();
- }
-
- @Override
- public void run() {
- if (mBuffer.isAckingEnabled() && !mBuffer.getInputHandler().isThrottlingEnabled()) {
- ackRecords();
- }
- if (mBuffer.isTimeTrackingEnabled()) {
- checkLatencyViolation();
- }
- }
-
- private void ackRecords() {
- Set<Integer> partitions = storageFromeHandler.getPartitionsWithStats();
- List<Integer> basesCovered = new ArrayList<Integer>();
- for (int intakePartition : partitions) {
- Map<Integer, IntakePartitionStatistics> baseAcks = storageFromeHandler
- .getBaseAcksForPartition(intakePartition);
- for (Entry<Integer, IntakePartitionStatistics> entry : baseAcks.entrySet()) {
- int base = entry.getKey();
- IntakePartitionStatistics stats = entry.getValue();
- Integer maxIntakeBaseForPartition = maxIntakeBaseCovered.get(intakePartition);
- if (maxIntakeBaseForPartition == null || maxIntakeBaseForPartition < base) {
- tupleCommitAckMessage.reset(intakePartition, base, stats.getAckInfo());
- feedManager.getFeedMessageService().sendMessage(tupleCommitAckMessage);
- } else {
- basesCovered.add(base);
- }
- }
- for (Integer b : basesCovered) {
- baseAcks.remove(b);
- }
- basesCovered.clear();
- }
- }
-
- private void checkLatencyViolation() {
- long avgDelayPersistence = storageFromeHandler.getAvgDelayPersistence();
- if (avgDelayPersistence > policyAccessor.getMaxDelayRecordPersistence()) {
- countDelayExceeded++;
- if (countDelayExceeded > PERSISTENCE_DELAY_VIOLATION_MAX) {
- storageReportMessage.reset(0, false, mBuffer.getAvgDelayRecordPersistence());
- feedManager.getFeedMessageService().sendMessage(storageReportMessage);
- }
- } else {
- countDelayExceeded = 0;
- }
- }
-
- public void receiveCommitAckResponse(FeedTupleCommitResponseMessage message) {
- maxIntakeBaseCovered.put(message.getIntakePartition(), message.getMaxWindowAcked());
- }
- }
-
- public static class LogInputOutputRateTask extends TimerTask {
-
- private final MonitoredBuffer mBuffer;
- private final boolean log;
- private final boolean reportInflow;
- private final boolean reportOutflow;
-
- private final IFeedMessageService messageService;
- private final FeedReportMessage message;
-
- public LogInputOutputRateTask(MonitoredBuffer mBuffer, boolean log, boolean reportInflow, boolean reportOutflow) {
- this.mBuffer = mBuffer;
- this.log = log;
- this.reportInflow = reportInflow;
- this.reportOutflow = reportOutflow;
- if (reportInflow || reportOutflow) {
- ValueType vType = reportInflow ? ValueType.INFLOW_RATE : ValueType.OUTFLOW_RATE;
- messageService = mBuffer.getInputHandler().getFeedManager().getFeedMessageService();
- message = new FeedReportMessage(mBuffer.getInputHandler().getConnectionId(), mBuffer.getRuntimeId(),
- vType, 0);
- } else {
- messageService = null;
- message = null;
- }
-
- }
-
- @Override
- public void run() {
- int pendingWork = mBuffer.getWorkSize();
- int outflowRate = mBuffer.getOutflowRate();
- int inflowRate = mBuffer.getInflowRate();
- if (log) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(mBuffer.getRuntimeId() + " " + "Inflow rate:" + inflowRate + " Outflow Rate:"
- + outflowRate + " Pending Work " + pendingWork);
- }
- }
- if (reportInflow) {
- message.reset(inflowRate);
- } else if (reportOutflow) {
- message.reset(outflowRate);
- }
- messageService.sendMessage(message);
- }
- }
-
- public static class MonitorInputQueueLengthTimerTask extends TimerTask {
-
- private final MonitoredBuffer mBuffer;
- private final IFrameEventCallback callback;
- private final int pendingWorkThreshold;
- private final int maxSuccessiveThresholdPeriods;
- private FrameEvent lastEvent = FrameEvent.NO_OP;
- private int pendingWorkExceedCount = 0;
-
- public MonitorInputQueueLengthTimerTask(MonitoredBuffer mBuffer, IFrameEventCallback callback) {
- this.mBuffer = mBuffer;
- this.callback = callback;
- AsterixFeedProperties props = mBuffer.getInputHandler().getFeedManager().getAsterixFeedProperties();
- pendingWorkThreshold = props.getPendingWorkThreshold();
- maxSuccessiveThresholdPeriods = props.getMaxSuccessiveThresholdPeriod();
- }
-
- @Override
- public void run() {
- int pendingWork = mBuffer.getWorkSize();
- if (mBuffer.getMode().equals(Mode.PROCESS_SPILL) || mBuffer.getMode().equals(Mode.PROCESS_BACKLOG)) {
- return;
- }
-
- switch (lastEvent) {
- case NO_OP:
- case PENDING_WORK_DONE:
- case FINISHED_PROCESSING_SPILLAGE:
- if (pendingWork > pendingWorkThreshold) {
- pendingWorkExceedCount++;
- if (pendingWorkExceedCount > maxSuccessiveThresholdPeriods) {
- pendingWorkExceedCount = 0;
- lastEvent = FrameEvent.PENDING_WORK_THRESHOLD_REACHED;
- callback.frameEvent(lastEvent);
- }
- } else if (pendingWork == 0 && mBuffer.getMode().equals(Mode.SPILL)) {
- lastEvent = FrameEvent.PENDING_WORK_DONE;
- callback.frameEvent(lastEvent);
- }
- break;
- case PENDING_WORK_THRESHOLD_REACHED:
- if (((pendingWork * 1.0) / pendingWorkThreshold) <= 0.5) {
- lastEvent = FrameEvent.PENDING_WORK_DONE;
- callback.frameEvent(lastEvent);
- }
- break;
- case FINISHED_PROCESSING:
- break;
-
- }
- }
- }
-
- /**
- * A timer task to measure and compare the processing rate and inflow rate
- * to look for possibility to scale-in, that is reduce the degree of cardinality
- * of the compute operator.
- */
- public static class MonitoreProcessRateTimerTask extends TimerTask {
-
- private final MonitoredBuffer mBuffer;
- private final IFeedManager feedManager;
- private int nPartitions;
- private ScaleInReportMessage sMessage;
- private boolean proposedChange;
-
- public MonitoreProcessRateTimerTask(MonitoredBuffer mBuffer, IFeedManager feedManager,
- FeedConnectionId connectionId, int nPartitions) {
- this.mBuffer = mBuffer;
- this.feedManager = feedManager;
- this.nPartitions = nPartitions;
- this.sMessage = new ScaleInReportMessage(connectionId, FeedRuntimeType.COMPUTE, 0, 0);
- this.proposedChange = false;
- }
-
- public int getNumberOfPartitions() {
- return nPartitions;
- }
-
- public void setNumberOfPartitions(int nPartitions) {
- this.nPartitions = nPartitions;
- proposedChange = false;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Reset the number of partitions for " + mBuffer.getRuntimeId() + " to " + nPartitions);
- }
- }
-
- @Override
- public void run() {
- if (!proposedChange) {
- int inflowRate = mBuffer.getInflowRate();
- int procRate = mBuffer.getProcessingRate();
- if (inflowRate > 0 && procRate > 0) {
- if (inflowRate < procRate) {
- int possibleCardinality = (int) Math.ceil(nPartitions * inflowRate / (double) procRate);
- if (possibleCardinality < nPartitions
- && ((((nPartitions - possibleCardinality) * 1.0) / nPartitions) >= 0.25)) {
- sMessage.reset(nPartitions, possibleCardinality);
- feedManager.getFeedMessageService().sendMessage(sMessage);
- proposedChange = true;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Proposed scale-in " + sMessage);
- }
- }
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Inflow Rate (" + inflowRate + ") exceeds Processing Rate" + " (" + procRate
- + ")");
- }
- }
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Waiting for earlier proposal to scale in to be applied");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoad.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoad.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoad.java
deleted file mode 100644
index b654563..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoad.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-
-public class NodeLoad implements Comparable<NodeLoad> {
-
- private final String nodeId;
-
- private int nRuntimes;
-
- public NodeLoad(String nodeId) {
- this.nodeId = nodeId;
- this.nRuntimes = 0;
- }
-
- public void addLoad() {
- nRuntimes++;
- }
-
- public void removeLoad(FeedRuntimeType runtimeType) {
- nRuntimes--;
- }
-
- @Override
- public int compareTo(NodeLoad o) {
- if (this == o) {
- return 0;
- }
- return nRuntimes - o.getnRuntimes();
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public int getnRuntimes() {
- return nRuntimes;
- }
-
- public void setnRuntimes(int nRuntimes) {
- this.nRuntimes = nRuntimes;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReport.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReport.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReport.java
deleted file mode 100644
index a509341..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReport.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class NodeLoadReport implements Comparable<NodeLoadReport> {
-
- private final String nodeId;
- private float cpuLoad;
- private double usedHeap;
- private int nRuntimes;
-
- public NodeLoadReport(String nodeId, float cpuLoad, float usedHeap, int nRuntimes) {
- this.nodeId = nodeId;
- this.cpuLoad = cpuLoad;
- this.usedHeap = usedHeap;
- this.nRuntimes = nRuntimes;
- }
-
- public static NodeLoadReport read(JSONObject obj) throws JSONException {
- NodeLoadReport r = new NodeLoadReport(obj.getString(FeedConstants.MessageConstants.NODE_ID),
- (float) obj.getDouble(FeedConstants.MessageConstants.CPU_LOAD),
- (float) obj.getDouble(FeedConstants.MessageConstants.HEAP_USAGE),
- obj.getInt(FeedConstants.MessageConstants.N_RUNTIMES));
- return r;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof NodeLoadReport)) {
- return false;
- }
- return ((NodeLoadReport) o).nodeId.equals(nodeId);
- }
-
- @Override
- public int hashCode() {
- return nodeId.hashCode();
- }
-
- @Override
- public int compareTo(NodeLoadReport o) {
- if (nRuntimes != o.getnRuntimes()) {
- return nRuntimes - o.getnRuntimes();
- } else {
- return (int) (this.cpuLoad - ((NodeLoadReport) o).cpuLoad);
- }
- }
-
- public float getCpuLoad() {
- return cpuLoad;
- }
-
- public void setCpuLoad(float cpuLoad) {
- this.cpuLoad = cpuLoad;
- }
-
- public double getUsedHeap() {
- return usedHeap;
- }
-
- public void setUsedHeap(double usedHeap) {
- this.usedHeap = usedHeap;
- }
-
- public int getnRuntimes() {
- return nRuntimes;
- }
-
- public void setnRuntimes(int nRuntimes) {
- this.nRuntimes = nRuntimes;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java
deleted file mode 100644
index 6be0211..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/NodeLoadReportService.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.OperatingSystemMXBean;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedMessageService;
-import org.apache.asterix.common.feeds.api.IFeedService;
-import org.apache.asterix.common.feeds.message.NodeReportMessage;
-
-public class NodeLoadReportService implements IFeedService {
-
- private static final int NODE_LOAD_REPORT_FREQUENCY = 2000;
- private static final float CPU_CHANGE_THRESHOLD = 0.2f;
- private static final float HEAP_CHANGE_THRESHOLD = 0.4f;
-
- private final NodeLoadReportTask task;
- private final Timer timer;
-
- public NodeLoadReportService(String nodeId, IFeedManager feedManager) {
- this.task = new NodeLoadReportTask(nodeId, feedManager);
- this.timer = new Timer();
- }
-
- @Override
- public void start() throws Exception {
- timer.schedule(task, 0, NODE_LOAD_REPORT_FREQUENCY);
- }
-
- @Override
- public void stop() {
- timer.cancel();
- }
-
- private static class NodeLoadReportTask extends TimerTask {
-
- private final IFeedManager feedManager;
- private final NodeReportMessage message;
- private final IFeedMessageService messageService;
-
- private static OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
- private static MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
-
- public NodeLoadReportTask(String nodeId, IFeedManager feedManager) {
- this.feedManager = feedManager;
- this.message = new NodeReportMessage(0.0f, 0L, 0);
- this.messageService = feedManager.getFeedMessageService();
- }
-
- @Override
- public void run() {
- List<FeedRuntimeId> runtimeIds = feedManager.getFeedConnectionManager().getRegisteredRuntimes();
- int nRuntimes = runtimeIds.size();
- double cpuLoad = getCpuLoad();
- double usedHeap = getUsedHeap();
- if (sendMessage(nRuntimes, cpuLoad, usedHeap)) {
- message.reset(cpuLoad, usedHeap, nRuntimes);
- messageService.sendMessage(message);
- }
- }
-
- private boolean sendMessage(int nRuntimes, double cpuLoad, double usedHeap) {
- if (message == null) {
- return true;
- }
-
- boolean changeInCpu = (Math.abs(cpuLoad - message.getCpuLoad())
- / message.getCpuLoad()) > CPU_CHANGE_THRESHOLD;
- boolean changeInUsedHeap = (Math.abs(usedHeap - message.getUsedHeap())
- / message.getUsedHeap()) > HEAP_CHANGE_THRESHOLD;
- boolean changeInRuntimeSize = nRuntimes != message.getnRuntimes();
- return changeInCpu || changeInUsedHeap || changeInRuntimeSize;
- }
-
- private double getCpuLoad() {
- return osBean.getSystemLoadAverage();
- }
-
- private double getUsedHeap() {
- return ((double) memBean.getHeapMemoryUsage().getUsed()) / memBean.getHeapMemoryUsage().getMax();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/Series.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/Series.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/Series.java
deleted file mode 100644
index 6f438ad..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/Series.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
-
-public abstract class Series {
-
- protected final MetricType type;
- protected int runningSum;
-
- public Series(MetricType type) {
- this.type = type;
- }
-
- public abstract void addValue(int value);
-
- public int getRunningSum() {
- return runningSum;
- }
-
- public MetricType getType() {
- return type;
- }
-
- public abstract void reset();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesAvg.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesAvg.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesAvg.java
deleted file mode 100644
index 6bfe925..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesAvg.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
-
-public class SeriesAvg extends Series {
-
- private int count;
-
- public SeriesAvg() {
- super(MetricType.AVG);
- }
-
- public int getAvg() {
- return runningSum / count;
- }
-
- public synchronized void addValue(int value) {
- if (value < 0) {
- return;
- }
- runningSum += value;
- count++;
- }
-
- public void reset(){
- count = 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesRate.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesRate.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesRate.java
deleted file mode 100644
index 9d0b0ea..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SeriesRate.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
-
-public class SeriesRate extends Series {
-
- private static final long REFRESH_MEASUREMENT = 5000; // 5 seconds
-
- private int rate;
- private Timer timer;
- private RateComputingTask task;
-
- public SeriesRate() {
- super(MetricType.RATE);
- begin();
- }
-
- public int getRate() {
- return rate;
- }
-
- public synchronized void addValue(int value) {
- if (value < 0) {
- return;
- }
- runningSum += value;
- }
-
- public void begin() {
- if (timer == null) {
- timer = new Timer();
- task = new RateComputingTask(this);
- timer.scheduleAtFixedRate(task, 0, REFRESH_MEASUREMENT);
- }
- }
-
- public void end() {
- if (timer != null) {
- timer.cancel();
- }
- }
-
- public void reset() {
- rate = 0;
- if (task != null) {
- task.reset();
- }
- }
-
- private class RateComputingTask extends TimerTask {
-
- private int lastMeasured = 0;
- private final SeriesRate series;
-
- public RateComputingTask(SeriesRate series) {
- this.series = series;
- }
-
- @Override
- public void run() {
- int currentValue = series.getRunningSum();
- rate = (int) (((currentValue - lastMeasured) * 1000) / REFRESH_MEASUREMENT);
- lastMeasured = currentValue;
- }
-
- public void reset() {
- lastMeasured = 0;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageFrameHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageFrameHandler.java
deleted file mode 100644
index 5b99b8c..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageFrameHandler.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.asterix.common.feeds.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class StorageFrameHandler {
-
- private final Map<Integer, Map<Integer, IntakePartitionStatistics>> intakeStatistics;
- private long avgDelayPersistence;
-
- public StorageFrameHandler() {
- intakeStatistics = new HashMap<Integer, Map<Integer, IntakePartitionStatistics>>();
- avgDelayPersistence = 0L;
- }
-
- public synchronized void updateTrackingInformation(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
- int nTuples = frameAccessor.getTupleCount();
- long delay = 0;
- long intakeTimestamp;
- long currentTime = System.currentTimeMillis();
- int partition = 0;
- int recordId = 0;
- for (int i = 0; i < nTuples; i++) {
- int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
- int openPartOffsetOrig = frame.getInt(recordStart + 6);
- int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
-
- int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
- + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
- recordId = frame.getInt(recordStart + recordIdOffset);
-
- int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2) + 1;
- partition = frame.getInt(recordStart + partitionOffset);
-
- ackRecordId(partition, recordId);
- int intakeTimestampValueOffset = partitionOffset + 4 + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2)
- + 1;
- intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
-
- int storeTimestampValueOffset = intakeTimestampValueOffset + 8
- + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
- frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
- delay += currentTime - intakeTimestamp;
- }
- avgDelayPersistence = delay / nTuples;
- }
-
- private void ackRecordId(int partition, int recordId) {
- Map<Integer, IntakePartitionStatistics> map = intakeStatistics.get(partition);
- if (map == null) {
- map = new HashMap<Integer, IntakePartitionStatistics>();
- intakeStatistics.put(partition, map);
- }
- int base = (int) Math.ceil(recordId * 1.0 / IntakePartitionStatistics.ACK_WINDOW_SIZE);
- IntakePartitionStatistics intakeStatsForBaseOfPartition = map.get(base);
- if (intakeStatsForBaseOfPartition == null) {
- intakeStatsForBaseOfPartition = new IntakePartitionStatistics(partition, base);
- map.put(base, intakeStatsForBaseOfPartition);
- }
- intakeStatsForBaseOfPartition.ackRecordId(recordId);
- }
-
- public byte[] getAckData(int partition, int base) {
- Map<Integer, IntakePartitionStatistics> intakeStats = intakeStatistics.get(partition);
- if (intakeStats != null) {
- IntakePartitionStatistics intakePartitionStats = intakeStats.get(base);
- if (intakePartitionStats != null) {
- return intakePartitionStats.getAckInfo();
- }
- }
- return null;
- }
-
- public synchronized Map<Integer, IntakePartitionStatistics> getBaseAcksForPartition(int partition) {
- Map<Integer, IntakePartitionStatistics> intakeStatsForPartition = intakeStatistics.get(partition);
- Map<Integer, IntakePartitionStatistics> clone = new HashMap<Integer, IntakePartitionStatistics>();
- for (Entry<Integer, IntakePartitionStatistics> entry : intakeStatsForPartition.entrySet()) {
- clone.put(entry.getKey(), entry.getValue());
- }
- return intakeStatsForPartition;
- }
-
- public long getAvgDelayPersistence() {
- return avgDelayPersistence;
- }
-
- public void setAvgDelayPersistence(long avgDelayPersistence) {
- this.avgDelayPersistence = avgDelayPersistence;
- }
-
- public Set<Integer> getPartitionsWithStats() {
- return intakeStatistics.keySet();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java
deleted file mode 100644
index 4027237..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/StorageSideMonitoredBuffer.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.feeds.FeedConstants.StatisticsConstants;
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFramePostProcessor;
-import org.apache.asterix.common.feeds.api.IFramePreprocessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class StorageSideMonitoredBuffer extends MonitoredBuffer {
-
- private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000; // 10
- // seconds
-
- private boolean ackingEnabled;
- private final boolean timeTrackingEnabled;
-
- public StorageSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
- IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
- IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
- FeedPolicyAccessor policyAccessor) {
- super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
- exceptionHandler, callback, nPartitions, policyAccessor);
- timeTrackingEnabled = policyAccessor.isTimeTrackingEnabled();
- ackingEnabled = policyAccessor.atleastOnceSemantics();
- if (ackingEnabled || timeTrackingEnabled) {
- storageFromeHandler = new StorageFrameHandler();
- this.storageTimeTrackingRateTask = new MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask(this,
- inputHandler.getFeedManager(), connectionId, runtimeId.getPartition(), policyAccessor,
- storageFromeHandler);
- this.timer.scheduleAtFixedRate(storageTimeTrackingRateTask, 0, STORAGE_TIME_TRACKING_FREQUENCY);
- }
- }
-
- @Override
- protected boolean monitorProcessingRate() {
- return false;
- }
-
- @Override
- protected boolean logInflowOutflowRate() {
- return true;
- }
-
- @Override
- public IFramePreprocessor getFramePreProcessor() {
- return new IFramePreprocessor() {
-
- @Override
- public void preProcess(ByteBuffer frame) {
- try {
- if (ackingEnabled) {
- storageFromeHandler.updateTrackingInformation(frame, inflowFta);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- }
-
- @Override
- protected IFramePostProcessor getFramePostProcessor() {
- return new IFramePostProcessor() {
-
- private static final long NORMAL_WINDOW_LIMIT = 400 * 1000;
- private static final long HIGH_WINDOW_LIMIT = 800 * 1000;
-
- private long delayNormalWindow = 0;
- private long delayHighWindow = 0;
- private long delayLowWindow = 0;
-
- private int countNormalWindow;
- private int countHighWindow;
- private int countLowWindow;
-
- private long beginIntakeTimestamp = 0;
-
- @Override
- public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
- if (ackingEnabled || timeTrackingEnabled) {
- int nTuples = frameAccessor.getTupleCount();
- long intakeTimestamp;
- long currentTime = System.currentTimeMillis();
- for (int i = 0; i < nTuples; i++) {
- int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
- int openPartOffsetOrig = frame.getInt(recordStart + 6);
- int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
-
- int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
- + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
-
- int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2)
- + 1;
-
- int intakeTimestampValueOffset = partitionOffset + 4
- + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2) + 1;
- intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
- if (beginIntakeTimestamp == 0) {
- beginIntakeTimestamp = intakeTimestamp;
- LOGGER.warning("Begin Timestamp: " + beginIntakeTimestamp);
- }
-
- updateRunningAvg(intakeTimestamp, currentTime);
-
- int storeTimestampValueOffset = intakeTimestampValueOffset + 8
- + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
- frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
- }
- logRunningAvg();
- resetRunningAvg();
- }
- }
-
- private void updateRunningAvg(long intakeTimestamp, long currentTime) {
- long diffTimestamp = intakeTimestamp - beginIntakeTimestamp;
- long delay = (currentTime - intakeTimestamp);
- if (diffTimestamp < NORMAL_WINDOW_LIMIT) {
- delayNormalWindow += delay;
- countNormalWindow++;
- } else if (diffTimestamp < HIGH_WINDOW_LIMIT) {
- delayHighWindow += delay;
- countHighWindow++;
- } else {
- delayLowWindow += delay;
- countLowWindow++;
- }
- }
-
- private void resetRunningAvg() {
- delayNormalWindow = 0;
- countNormalWindow = 0;
- delayHighWindow = 0;
- countHighWindow = 0;
- delayLowWindow = 0;
- countLowWindow = 0;
- }
-
- private void logRunningAvg() {
- if (countNormalWindow != 0 && delayNormalWindow != 0) {
- LOGGER.warning("Window:" + 0 + ":" + "Avg Travel_Time:" + (delayNormalWindow / countNormalWindow));
- }
- if (countHighWindow != 0 && delayHighWindow != 0) {
- LOGGER.warning("Window:" + 1 + ":" + "Avg Travel_Time:" + (delayHighWindow / countHighWindow));
- }
- if (countLowWindow != 0 && delayLowWindow != 0) {
- LOGGER.warning("Window:" + 2 + ":" + "Avg Travel_Time:" + (delayLowWindow / countLowWindow));
- }
- }
-
- };
- }
-
- public boolean isAckingEnabled() {
- return ackingEnabled;
- }
-
- public void setAcking(boolean ackingEnabled) {
- this.ackingEnabled = ackingEnabled;
- }
-
- public boolean isTimeTrackingEnabled() {
- return timeTrackingEnabled;
- }
-
- @Override
- protected boolean monitorInputQueueLength() {
- return true;
- }
-
- @Override
- protected boolean reportOutflowRate() {
- return true;
- }
-
- @Override
- protected boolean reportInflowRate() {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java
deleted file mode 100644
index 7eb5921..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableFeedRuntimeId.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-
-public class SubscribableFeedRuntimeId extends FeedRuntimeId {
- private static final long serialVersionUID = 1L;
- private final FeedId feedId;
-
- public SubscribableFeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition) {
- super(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
- this.feedId = feedId;
- }
-
- public FeedId getFeedId() {
- return feedId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof SubscribableFeedRuntimeId)) {
- return false;
- }
-
- return (super.equals(o) && this.feedId.equals(((SubscribableFeedRuntimeId) o).getFeedId()));
- }
-
- @Override
- public int hashCode() {
- return super.hashCode() + feedId.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableRuntime.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableRuntime.java
deleted file mode 100644
index b09b3ff..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/SubscribableRuntime.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.asterix.common.feeds.api.ISubscriberRuntime;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class SubscribableRuntime extends FeedRuntime implements ISubscribableRuntime {
-
- protected static final Logger LOGGER = Logger.getLogger(SubscribableRuntime.class.getName());
-
- protected final FeedId feedId;
- protected final List<ISubscriberRuntime> subscribers;
- protected final RecordDescriptor recordDescriptor;
- protected final DistributeFeedFrameWriter dWriter;
-
- public SubscribableRuntime(FeedId feedId, FeedRuntimeId runtimeId, FeedRuntimeInputHandler inputHandler,
- DistributeFeedFrameWriter dWriter, RecordDescriptor recordDescriptor) {
- super(runtimeId, inputHandler, dWriter);
- this.feedId = feedId;
- this.recordDescriptor = recordDescriptor;
- this.dWriter = dWriter;
- this.subscribers = new ArrayList<ISubscriberRuntime>();
- }
-
- public FeedId getFeedId() {
- return feedId;
- }
-
- @Override
- public String toString() {
- return "SubscribableRuntime" + " [" + feedId + "]" + "(" + runtimeId + ")";
- }
-
- @Override
- public synchronized void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime)
- throws Exception {
- FeedFrameCollector collector = dWriter.subscribeFeed(new FeedPolicyAccessor(collectionRuntime.getFeedPolicy()),
- collectionRuntime.getInputHandler(), collectionRuntime.getConnectionId());
- collectionRuntime.setFrameCollector(collector);
- subscribers.add(collectionRuntime);
- }
-
- @Override
- public synchronized void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
- dWriter.unsubscribeFeed(collectionRuntime.getFeedFrameWriter());
- subscribers.remove(collectionRuntime);
- }
-
- @Override
- public synchronized List<ISubscriberRuntime> getSubscribers() {
- return subscribers;
- }
-
- @Override
- public DistributeFeedFrameWriter getFeedFrameWriter() {
- return dWriter;
- }
-
- public FeedRuntimeType getFeedRuntimeType() {
- return runtimeId.getFeedRuntimeType();
- }
-
- @Override
- public RecordDescriptor getRecordDescriptor() {
- return recordDescriptor;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IAdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IAdapterRuntimeManager.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IAdapterRuntimeManager.java
deleted file mode 100644
index 2eb6caa..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IAdapterRuntimeManager.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import org.apache.asterix.common.feeds.FeedId;
-
-public interface IAdapterRuntimeManager {
-
- public enum State {
- /**
- * Indicates that AsterixDB is maintaining the flow of data from external source into its storage.
- */
- ACTIVE_INGESTION,
-
- /**
- * Indicates that data from external source is being buffered and not
- * pushed downstream
- */
-
- INACTIVE_INGESTION,
- /**
- * Indicates that feed ingestion activity has finished.
- */
- FINISHED_INGESTION,
-
- /** Indicates the occurrence of a failure during the intake stage of a data ingestion pipeline **/
- FAILED_INGESTION
- }
-
- /**
- * Start feed ingestion
- * @throws Exception
- */
- public void start() throws Exception;
-
- /**
- * Stop feed ingestion.
- * @throws Exception
- */
- public void stop() throws Exception;
-
- /**
- * @return feedId associated with the feed that is being ingested
- */
- public FeedId getFeedId();
-
- /**
- * @return the instance of the feed adapter (an implementation of {@code IFeedAdapter}) in use.
- */
- public IDataSourceAdapter getFeedAdapter();
-
- /**
- * @return state associated with the AdapterRuntimeManager. See {@code State}.
- */
- public State getState();
-
- /**
- * @param state
- */
- public void setState(State state);
-
- public IIntakeProgressTracker getProgressTracker();
-
- public int getPartition();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ICentralFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ICentralFeedManager.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ICentralFeedManager.java
deleted file mode 100644
index 0092e6b..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/ICentralFeedManager.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import java.io.IOException;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-
-public interface ICentralFeedManager {
-
- public void start() throws AsterixException;
-
- public void stop() throws AsterixException, IOException;
-
- public IFeedTrackingManager getFeedTrackingManager();
-
- public IFeedLoadManager getFeedLoadManager();
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java
deleted file mode 100644
index 9dd4e76..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IDataSourceAdapter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import java.io.Serializable;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-
-/**
- * A super interface implemented by a data source adapter. An adapter can be a
- * pull based or push based. This interface provides all common APIs that need
- * to be implemented by each adapter irrespective of the the kind of
- * adapter(pull or push).
- */
-public interface IDataSourceAdapter extends Serializable {
-
- /**
- * Triggers the adapter to begin ingesting data from the external source.
- *
- * @param partition
- * The adapter could be running with a degree of parallelism.
- * partition corresponds to the i'th parallel instance.
- * @param writer
- * The instance of frame writer that is used by the adapter to
- * write frame to. Adapter packs the fetched bytes (from external source),
- * packs them into frames and forwards the frames to an upstream receiving
- * operator using the instance of IFrameWriter.
- * @throws Exception
- */
- public void start(int partition, IFrameWriter writer) throws Exception;
-
- /**
- * Discontinue the ingestion of data.
- *
- * @throws Exception
- */
- public boolean stop() throws Exception;
-
- /**
- * @param e
- * @return true if the ingestion should continue post the exception else false
- */
- public boolean handleException(Throwable e);
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IExceptionHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IExceptionHandler.java
deleted file mode 100644
index db2c890..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IExceptionHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Handles an exception encountered during processing of a data frame.
- * In the case when the exception is of type {@code FrameDataException}, the causing
- * tuple is logged and a new frame with tuple after the exception-generating tuple
- * is returned. This funcitonality is used during feed ingestion to bypass an exception
- * generating tuple and thus avoid the data flow from terminating
- */
-public interface IExceptionHandler {
-
- /**
- * @param e
- * the exception that needs to be handled
- * @param frame
- * the frame that was being processed when exception occurred
- * @return returns a new frame with tuples after the exception generating tuple
- * @throws HyracksDataException
- */
- public ByteBuffer handleException(Exception e, ByteBuffer frame);
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedConnectionManager.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedConnectionManager.java
deleted file mode 100644
index 014a868..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedConnectionManager.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeManager;
-
-/**
- * Handle (de)registration of feeds for delivery of control messages.
- */
-public interface IFeedConnectionManager {
-
- /**
- * Allows registration of a feedRuntime.
- *
- * @param feedRuntime
- * @throws Exception
- */
- public void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime) throws Exception;
-
- /**
- * Obtain feed runtime corresponding to a feedRuntimeId
- *
- * @param feedRuntimeId
- * @return
- */
- public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId);
-
- /**
- * De-register a feed
- *
- * @param feedConnection
- * @throws IOException
- */
- void deregisterFeed(FeedConnectionId feedConnection);
-
- /**
- * Obtain the feed runtime manager associated with a feed.
- *
- * @param feedConnection
- * @return
- */
- public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedConnection);
-
- /**
- * Allows de-registration of a feed runtime.
- *
- * @param feedRuntimeId
- */
- void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId);
-
- public List<FeedRuntimeId> getRegisteredRuntimes();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedFrameHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedFrameHandler.java
deleted file mode 100644
index 3a95e51..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/api/IFeedFrameHandler.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds.api;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.asterix.common.feeds.DataBucket;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFeedFrameHandler {
-
- public void handleFrame(ByteBuffer frame) throws HyracksDataException;
-
- public void handleDataBucket(DataBucket bucket);
-
- public void close();
-
- public Iterator<ByteBuffer> replayData() throws HyracksDataException;
-
- public String getSummary();
-
-}