You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/05/15 19:03:56 UTC
[3/9] incubator-asterixdb git commit: Cleanup Feed CodeBase
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
deleted file mode 100644
index 67e6295..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.logging.Level;
-
-import org.apache.asterix.external.feed.api.IExceptionHandler;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.ValueType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.api.IFrameEventCallback;
-import org.apache.asterix.external.feed.api.IFrameEventCallback.FrameEvent;
-import org.apache.asterix.external.feed.api.IFramePostProcessor;
-import org.apache.asterix.external.feed.api.IFramePreprocessor;
-import org.apache.asterix.external.feed.dataflow.DataBucket;
-import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
-import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.MessageReceiver;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.LogInputOutputRateTask;
-import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitorInputQueueLengthTimerTask;
-import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitoreProcessRateTimerTask;
-import org.apache.asterix.external.feed.watch.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
-import org.apache.asterix.external.util.FeedFrameUtil;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public abstract class MonitoredBuffer extends MessageReceiver<DataBucket> {
-
- protected static final long LOG_INPUT_OUTPUT_RATE_FREQUENCY = 5000; // 5 seconds
- protected static final long INPUT_QUEUE_MEASURE_FREQUENCY = 1000; // 1 second
- protected static final long PROCESSING_RATE_MEASURE_FREQUENCY = 10000; // 10 seconds
-
- protected static final int PROCESS_RATE_REFRESH = 2; // refresh processing rate every 10th frame
-
- protected final IHyracksTaskContext ctx;
- protected final FeedConnectionId connectionId;
- protected final FeedRuntimeId runtimeId;
- protected final FrameTupleAccessor inflowFta;
- protected final FrameTupleAccessor outflowFta;
- protected final FeedRuntimeInputHandler inputHandler;
- protected final IFrameEventCallback callback;
- protected final Timer timer;
- private final IExceptionHandler exceptionHandler;
- protected final FeedPolicyAccessor policyAccessor;
- protected int nPartitions;
-
- private IFrameWriter frameWriter;
- protected IFeedMetricCollector metricCollector;
- protected boolean monitorProcessingRate = false;
- protected boolean monitorInputQueueLength = false;
- protected boolean logInflowOutflowRate = false;
- protected boolean reportOutflowRate = false;
- protected boolean reportInflowRate = false;
-
- protected int inflowReportSenderId = -1;
- protected int outflowReportSenderId = -1;
- protected TimerTask monitorInputQueueLengthTask;
- protected TimerTask processingRateTask;
- protected TimerTask logInflowOutflowRateTask;
- protected MonitoredBufferStorageTimerTask storageTimeTrackingRateTask;
- protected StorageFrameHandler storageFromeHandler;
-
- protected int processingRate = -1;
- protected int frameCount = 0;
- private long avgDelayPersistence = 0;
- private boolean active;
- private Map<Integer, Long> tupleTimeStats;
- IFramePostProcessor postProcessor = null;
- IFramePreprocessor preProcessor = null;
-
- public static MonitoredBuffer getMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
- IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
- IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
- FeedPolicyAccessor policyAccessor) {
- switch (runtimeId.getFeedRuntimeType()) {
- case COMPUTE:
- return new ComputeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
- connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
- case STORE:
- return new StorageSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
- connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
- case COLLECT:
- return new IntakeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
- connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
- default:
- return new BasicMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
- connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
- }
- }
-
- protected MonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
- FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
- FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
- IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
- this.ctx = ctx;
- this.connectionId = connectionId;
- this.frameWriter = frameWriter;
- this.inflowFta = new FrameTupleAccessor(recordDesc);
- this.outflowFta = new FrameTupleAccessor(recordDesc);
- this.runtimeId = runtimeId;
- this.metricCollector = metricCollector;
- this.exceptionHandler = exceptionHandler;
- this.callback = callback;
- this.inputHandler = inputHandler;
- this.timer = new Timer();
- this.policyAccessor = policyAccessor;
- this.nPartitions = nPartitions;
- this.active = true;
- initializeMonitoring();
- }
-
- protected abstract boolean monitorProcessingRate();
-
- protected abstract boolean logInflowOutflowRate();
-
- protected abstract boolean reportOutflowRate();
-
- protected abstract boolean reportInflowRate();
-
- protected abstract boolean monitorInputQueueLength();
-
- protected abstract IFramePreprocessor getFramePreProcessor();
-
- protected abstract IFramePostProcessor getFramePostProcessor();
-
- protected void initializeMonitoring() {
- monitorProcessingRate = monitorProcessingRate();
- monitorInputQueueLength = monitorInputQueueLength();
- reportInflowRate = reportInflowRate();
- reportOutflowRate = reportOutflowRate();
- logInflowOutflowRate = policyAccessor.isLoggingStatisticsEnabled() || logInflowOutflowRate();
-
- if (monitorProcessingRate && policyAccessor.isElastic()) { // check possibility to scale in
- this.processingRateTask = new MonitoreProcessRateTimerTask(this, inputHandler.getFeedManager(),
- connectionId, nPartitions);
- this.timer.scheduleAtFixedRate(processingRateTask, 0, PROCESSING_RATE_MEASURE_FREQUENCY);
- }
-
- if (monitorInputQueueLength && (policyAccessor.isElastic() || policyAccessor.throttlingEnabled()
- || policyAccessor.spillToDiskOnCongestion() || policyAccessor.discardOnCongestion())) {
- this.monitorInputQueueLengthTask = new MonitorInputQueueLengthTimerTask(this, callback);
- this.timer.scheduleAtFixedRate(monitorInputQueueLengthTask, 0, INPUT_QUEUE_MEASURE_FREQUENCY);
- }
-
- if (reportInflowRate || reportOutflowRate) {
- this.logInflowOutflowRateTask = new LogInputOutputRateTask(this, logInflowOutflowRate, reportInflowRate,
- reportOutflowRate);
- this.timer.scheduleAtFixedRate(logInflowOutflowRateTask, 0, LOG_INPUT_OUTPUT_RATE_FREQUENCY);
- this.inflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
- ValueType.INFLOW_RATE, MetricType.RATE);
- this.outflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
- ValueType.OUTFLOW_RATE, MetricType.RATE);
- }
- }
-
- protected void deinitializeMonitoring() {
- if (monitorInputQueueLengthTask != null) {
- monitorInputQueueLengthTask.cancel();
- }
- if (processingRateTask != null) {
- processingRateTask.cancel();
- }
- if (reportInflowRate || reportOutflowRate) {
- metricCollector.removeReportSender(inflowReportSenderId);
- metricCollector.removeReportSender(outflowReportSenderId);
- logInflowOutflowRateTask.cancel();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Disabled monitoring for " + this.runtimeId);
- }
- }
-
- protected void postProcessFrame(long startTime, ByteBuffer frame) throws Exception {
- if (monitorProcessingRate) {
- frameCount++;
- if (frameCount % PROCESS_RATE_REFRESH == 0) {
- long endTime = System.currentTimeMillis();
- processingRate = (int) ((double) outflowFta.getTupleCount() * 1000 / (endTime - startTime));
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Processing Rate :" + processingRate + " tuples/sec");
- }
- frameCount = 0;
- }
- }
-
- if (logInflowOutflowRate || reportOutflowRate) {
- metricCollector.sendReport(outflowReportSenderId, outflowFta.getTupleCount());
- }
-
- postProcessFrame(frame);
-
- }
-
- protected void preProcessFrame(ByteBuffer frame) throws Exception {
- if (preProcessor == null) {
- preProcessor = getFramePreProcessor();
- }
- if (preProcessor != null) {
- preProcessor.preProcess(frame);
- }
- }
-
- protected void postProcessFrame(ByteBuffer frame) throws Exception {
- if (postProcessor == null) {
- postProcessor = getFramePostProcessor();
- }
- if (postProcessor != null) {
- outflowFta.reset(frame);
- postProcessor.postProcessFrame(frame, outflowFta);
- }
- }
-
- @Override
- public void sendMessage(DataBucket message) {
- inbox.add(message);
- }
-
- public void sendReport(ByteBuffer frame) {
- if ((reportInflowRate) && !(inputHandler.getMode().equals(Mode.PROCESS_BACKLOG)
- || inputHandler.getMode().equals(Mode.PROCESS_SPILL))) {
- inflowFta.reset(frame);
- metricCollector.sendReport(inflowReportSenderId, inflowFta.getTupleCount());
- }
- }
-
- /** return rate in terms of tuples/sec **/
- public int getInflowRate() {
- return metricCollector.getMetric(inflowReportSenderId);
- }
-
- /** return rate in terms of tuples/sec **/
- public int getOutflowRate() {
- return metricCollector.getMetric(outflowReportSenderId);
- }
-
- /** return the number of pending frames from the input queue **/
- public int getWorkSize() {
- return inbox.size();
- }
-
- /** reset the number of partitions (cardinality) for the runtime **/
- public void setNumberOfPartitions(int nPartitions) {
- if (processingRateTask != null) {
- int currentPartitions = ((MonitoreProcessRateTimerTask) processingRateTask).getNumberOfPartitions();
- if (currentPartitions != nPartitions) {
- ((MonitoreProcessRateTimerTask) processingRateTask).setNumberOfPartitions(nPartitions);
- }
- }
- }
-
- public FeedRuntimeInputHandler getInputHandler() {
- return inputHandler;
- }
-
- public synchronized void close(boolean processPending, boolean disableMonitoring) {
- super.close(processPending);
- if (disableMonitoring) {
- deinitializeMonitoring();
- }
- active = false;
- }
-
- @Override
- public synchronized void processMessage(DataBucket message) throws Exception {
- if (!active) {
- message.doneReading();
- return;
- }
- switch (message.getContentType()) {
- case DATA:
- boolean finishedProcessing = false;
- ByteBuffer frameReceived = message.getContent();
- ByteBuffer frameToProcess = null;
- if (inputHandler.isThrottlingEnabled()) {
- inflowFta.reset(frameReceived);
- int pRate = getProcessingRate();
- int inflowRate = getInflowRate();
- if (inflowRate > pRate) {
- double retainFraction = (pRate * 0.8 / inflowRate);
- frameToProcess = throttleFrame(inflowFta, retainFraction);
- inflowFta.reset(frameToProcess);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Throttling at fraction " + retainFraction + "inflow rate " + inflowRate
- + " no of tuples remaining " + inflowFta.getTupleCount());
-
- }
- } else {
- frameToProcess = frameReceived;
- }
- } else {
- frameToProcess = frameReceived;
- }
- outflowFta.reset(frameToProcess);
- long startTime = 0;
- while (!finishedProcessing) {
- try {
- inflowFta.reset(frameToProcess);
- startTime = System.currentTimeMillis();
- preProcessFrame(frameToProcess);
- frameWriter.nextFrame(frameToProcess);
- postProcessFrame(startTime, frameToProcess);
- finishedProcessing = true;
- } catch (Exception e) {
- e.printStackTrace();
- frameToProcess = exceptionHandler.handleException(e, frameToProcess);
- finishedProcessing = true;
- }
- }
- message.doneReading();
- break;
- case EOD:
- message.doneReading();
- timer.cancel();
- callback.frameEvent(FrameEvent.FINISHED_PROCESSING);
- break;
- case EOSD:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Done processing spillage");
- }
- message.doneReading();
- callback.frameEvent(FrameEvent.FINISHED_PROCESSING_SPILLAGE);
- break;
-
- }
- }
-
- private ByteBuffer throttleFrame(FrameTupleAccessor fta, double retainFraction) throws HyracksDataException {
- int desiredTuples = (int) (fta.getTupleCount() * retainFraction);
- return FeedFrameUtil.getSampledFrame(ctx, fta, desiredTuples);
- }
-
- public Mode getMode() {
- return inputHandler.getMode();
- }
-
- public FeedRuntimeId getRuntimeId() {
- return runtimeId;
- }
-
- public void setFrameWriter(IFrameWriter frameWriter) {
- this.frameWriter = frameWriter;
- }
-
- public void reset() {
- active = true;
- if (logInflowOutflowRate) {
- metricCollector.resetReportSender(inflowReportSenderId);
- metricCollector.resetReportSender(outflowReportSenderId);
- }
- }
-
- public int getProcessingRate() {
- return processingRate;
- }
-
- public Map<Integer, Long> getTupleTimeStats() {
- return tupleTimeStats;
- }
-
- public long getAvgDelayRecordPersistence() {
- return avgDelayPersistence;
- }
-
- public MonitoredBufferStorageTimerTask getStorageTimeTrackingRateTask() {
- return storageTimeTrackingRateTask;
- }
-
- @Override
- public void emptyInbox() throws HyracksDataException {
- inputHandler.flush();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
deleted file mode 100644
index 86c6bca..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBufferTimerTasks.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedMessageService;
-import org.apache.asterix.external.feed.api.IFrameEventCallback;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.ValueType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.api.IFrameEventCallback.FrameEvent;
-import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedReportMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
-import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
-import org.apache.asterix.external.feed.message.ScaleInReportMessage;
-import org.apache.asterix.external.feed.message.StorageReportFeedMessage;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-
-public class MonitoredBufferTimerTasks {
-
- private static final Logger LOGGER = Logger.getLogger(MonitorInputQueueLengthTimerTask.class.getName());
-
- public static class MonitoredBufferStorageTimerTask extends TimerTask {
-
- private static final int PERSISTENCE_DELAY_VIOLATION_MAX = 5;
-
- private final StorageSideMonitoredBuffer mBuffer;
- private final IFeedManager feedManager;
- private final int partition;
- private final FeedConnectionId connectionId;
- private final FeedPolicyAccessor policyAccessor;
- private final StorageFrameHandler storageFromeHandler;
- private final StorageReportFeedMessage storageReportMessage;
- private final FeedTupleCommitAckMessage tupleCommitAckMessage;
-
- private Map<Integer, Integer> maxIntakeBaseCovered;
- private int countDelayExceeded = 0;
-
- public MonitoredBufferStorageTimerTask(StorageSideMonitoredBuffer mBuffer, IFeedManager feedManager,
- FeedConnectionId connectionId, int partition, FeedPolicyAccessor policyAccessor,
- StorageFrameHandler storageFromeHandler) {
- this.mBuffer = mBuffer;
- this.feedManager = feedManager;
- this.connectionId = connectionId;
- this.partition = partition;
- this.policyAccessor = policyAccessor;
- this.storageFromeHandler = storageFromeHandler;
- this.storageReportMessage = new StorageReportFeedMessage(this.connectionId, this.partition, 0, false, 0, 0);
- this.tupleCommitAckMessage = new FeedTupleCommitAckMessage(this.connectionId, 0, 0, null);
- this.maxIntakeBaseCovered = new HashMap<Integer, Integer>();
- }
-
- @Override
- public void run() {
- if (mBuffer.isAckingEnabled() && !mBuffer.getInputHandler().isThrottlingEnabled()) {
- ackRecords();
- }
- if (mBuffer.isTimeTrackingEnabled()) {
- checkLatencyViolation();
- }
- }
-
- private void ackRecords() {
- Set<Integer> partitions = storageFromeHandler.getPartitionsWithStats();
- List<Integer> basesCovered = new ArrayList<Integer>();
- for (int intakePartition : partitions) {
- Map<Integer, IntakePartitionStatistics> baseAcks = storageFromeHandler
- .getBaseAcksForPartition(intakePartition);
- for (Entry<Integer, IntakePartitionStatistics> entry : baseAcks.entrySet()) {
- int base = entry.getKey();
- IntakePartitionStatistics stats = entry.getValue();
- Integer maxIntakeBaseForPartition = maxIntakeBaseCovered.get(intakePartition);
- if (maxIntakeBaseForPartition == null || maxIntakeBaseForPartition < base) {
- tupleCommitAckMessage.reset(intakePartition, base, stats.getAckInfo());
- feedManager.getFeedMessageService().sendMessage(tupleCommitAckMessage);
- } else {
- basesCovered.add(base);
- }
- }
- for (Integer b : basesCovered) {
- baseAcks.remove(b);
- }
- basesCovered.clear();
- }
- }
-
- private void checkLatencyViolation() {
- long avgDelayPersistence = storageFromeHandler.getAvgDelayPersistence();
- if (avgDelayPersistence > policyAccessor.getMaxDelayRecordPersistence()) {
- countDelayExceeded++;
- if (countDelayExceeded > PERSISTENCE_DELAY_VIOLATION_MAX) {
- storageReportMessage.reset(0, false, mBuffer.getAvgDelayRecordPersistence());
- feedManager.getFeedMessageService().sendMessage(storageReportMessage);
- }
- } else {
- countDelayExceeded = 0;
- }
- }
-
- public void receiveCommitAckResponse(FeedTupleCommitResponseMessage message) {
- maxIntakeBaseCovered.put(message.getIntakePartition(), message.getMaxWindowAcked());
- }
- }
-
- public static class LogInputOutputRateTask extends TimerTask {
-
- private final MonitoredBuffer mBuffer;
- private final boolean log;
- private final boolean reportInflow;
- private final boolean reportOutflow;
-
- private final IFeedMessageService messageService;
- private final FeedReportMessage message;
-
- public LogInputOutputRateTask(MonitoredBuffer mBuffer, boolean log, boolean reportInflow, boolean reportOutflow) {
- this.mBuffer = mBuffer;
- this.log = log;
- this.reportInflow = reportInflow;
- this.reportOutflow = reportOutflow;
- if (reportInflow || reportOutflow) {
- ValueType vType = reportInflow ? ValueType.INFLOW_RATE : ValueType.OUTFLOW_RATE;
- messageService = mBuffer.getInputHandler().getFeedManager().getFeedMessageService();
- message = new FeedReportMessage(mBuffer.getInputHandler().getConnectionId(), mBuffer.getRuntimeId(),
- vType, 0);
- } else {
- messageService = null;
- message = null;
- }
-
- }
-
- @Override
- public void run() {
- int pendingWork = mBuffer.getWorkSize();
- int outflowRate = mBuffer.getOutflowRate();
- int inflowRate = mBuffer.getInflowRate();
- if (log) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(mBuffer.getRuntimeId() + " " + "Inflow rate:" + inflowRate + " Outflow Rate:"
- + outflowRate + " Pending Work " + pendingWork);
- }
- }
- if (reportInflow) {
- message.reset(inflowRate);
- } else if (reportOutflow) {
- message.reset(outflowRate);
- }
- messageService.sendMessage(message);
- }
- }
-
- public static class MonitorInputQueueLengthTimerTask extends TimerTask {
-
- private final MonitoredBuffer mBuffer;
- private final IFrameEventCallback callback;
- private final int pendingWorkThreshold;
- private final int maxSuccessiveThresholdPeriods;
- private FrameEvent lastEvent = FrameEvent.NO_OP;
- private int pendingWorkExceedCount = 0;
-
- public MonitorInputQueueLengthTimerTask(MonitoredBuffer mBuffer, IFrameEventCallback callback) {
- this.mBuffer = mBuffer;
- this.callback = callback;
- AsterixFeedProperties props = mBuffer.getInputHandler().getFeedManager().getAsterixFeedProperties();
- pendingWorkThreshold = props.getPendingWorkThreshold();
- maxSuccessiveThresholdPeriods = props.getMaxSuccessiveThresholdPeriod();
- }
-
- @Override
- public void run() {
- int pendingWork = mBuffer.getWorkSize();
- if (mBuffer.getMode().equals(Mode.PROCESS_SPILL) || mBuffer.getMode().equals(Mode.PROCESS_BACKLOG)) {
- return;
- }
-
- switch (lastEvent) {
- case NO_OP:
- case PENDING_WORK_DONE:
- case FINISHED_PROCESSING_SPILLAGE:
- if (pendingWork > pendingWorkThreshold) {
- pendingWorkExceedCount++;
- if (pendingWorkExceedCount > maxSuccessiveThresholdPeriods) {
- pendingWorkExceedCount = 0;
- lastEvent = FrameEvent.PENDING_WORK_THRESHOLD_REACHED;
- callback.frameEvent(lastEvent);
- }
- } else if (pendingWork == 0 && mBuffer.getMode().equals(Mode.SPILL)) {
- lastEvent = FrameEvent.PENDING_WORK_DONE;
- callback.frameEvent(lastEvent);
- }
- break;
- case PENDING_WORK_THRESHOLD_REACHED:
- if (((pendingWork * 1.0) / pendingWorkThreshold) <= 0.5) {
- lastEvent = FrameEvent.PENDING_WORK_DONE;
- callback.frameEvent(lastEvent);
- }
- break;
- case FINISHED_PROCESSING:
- break;
-
- }
- }
- }
-
- /**
- * A timer task to measure and compare the processing rate and inflow rate
- * to look for possibility to scale-in, that is reduce the degree of cardinality
- * of the compute operator.
- */
- public static class MonitoreProcessRateTimerTask extends TimerTask {
-
- private final MonitoredBuffer mBuffer;
- private final IFeedManager feedManager;
- private int nPartitions;
- private ScaleInReportMessage sMessage;
- private boolean proposedChange;
-
- public MonitoreProcessRateTimerTask(MonitoredBuffer mBuffer, IFeedManager feedManager,
- FeedConnectionId connectionId, int nPartitions) {
- this.mBuffer = mBuffer;
- this.feedManager = feedManager;
- this.nPartitions = nPartitions;
- this.sMessage = new ScaleInReportMessage(connectionId, FeedRuntimeType.COMPUTE, 0, 0);
- this.proposedChange = false;
- }
-
- public int getNumberOfPartitions() {
- return nPartitions;
- }
-
- public void setNumberOfPartitions(int nPartitions) {
- this.nPartitions = nPartitions;
- proposedChange = false;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Reset the number of partitions for " + mBuffer.getRuntimeId() + " to " + nPartitions);
- }
- }
-
- @Override
- public void run() {
- if (!proposedChange) {
- int inflowRate = mBuffer.getInflowRate();
- int procRate = mBuffer.getProcessingRate();
- if (inflowRate > 0 && procRate > 0) {
- if (inflowRate < procRate) {
- int possibleCardinality = (int) Math.ceil(nPartitions * inflowRate / (double) procRate);
- if (possibleCardinality < nPartitions
- && ((((nPartitions - possibleCardinality) * 1.0) / nPartitions) >= 0.25)) {
- sMessage.reset(nPartitions, possibleCardinality);
- feedManager.getFeedMessageService().sendMessage(sMessage);
- proposedChange = true;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Proposed scale-in " + sMessage);
- }
- }
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Inflow Rate (" + inflowRate + ") exceeds Processing Rate" + " (" + procRate
- + ")");
- }
- }
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Waiting for earlier proposal to scale in to be applied");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
deleted file mode 100644
index d3919b5..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoad.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-
-public class NodeLoad implements Comparable<NodeLoad> {
-
- private final String nodeId;
-
- private int nRuntimes;
-
- public NodeLoad(String nodeId) {
- this.nodeId = nodeId;
- this.nRuntimes = 0;
- }
-
- public void addLoad() {
- nRuntimes++;
- }
-
- public void removeLoad(FeedRuntimeType runtimeType) {
- nRuntimes--;
- }
-
- @Override
- public int compareTo(NodeLoad o) {
- if (this == o) {
- return 0;
- }
- return nRuntimes - o.getnRuntimes();
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public int getnRuntimes() {
- return nRuntimes;
- }
-
- public void setnRuntimes(int nRuntimes) {
- this.nRuntimes = nRuntimes;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
deleted file mode 100644
index bfddcf6..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReport.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import org.apache.asterix.external.util.FeedConstants;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class NodeLoadReport implements Comparable<NodeLoadReport> {
-
- private final String nodeId;
- private float cpuLoad;
- private double usedHeap;
- private int nRuntimes;
-
- public NodeLoadReport(String nodeId, float cpuLoad, float usedHeap, int nRuntimes) {
- this.nodeId = nodeId;
- this.cpuLoad = cpuLoad;
- this.usedHeap = usedHeap;
- this.nRuntimes = nRuntimes;
- }
-
- public static NodeLoadReport read(JSONObject obj) throws JSONException {
- NodeLoadReport r = new NodeLoadReport(obj.getString(FeedConstants.MessageConstants.NODE_ID),
- (float) obj.getDouble(FeedConstants.MessageConstants.CPU_LOAD),
- (float) obj.getDouble(FeedConstants.MessageConstants.HEAP_USAGE),
- obj.getInt(FeedConstants.MessageConstants.N_RUNTIMES));
- return r;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof NodeLoadReport)) {
- return false;
- }
- return ((NodeLoadReport) o).nodeId.equals(nodeId);
- }
-
- @Override
- public int hashCode() {
- return nodeId.hashCode();
- }
-
- @Override
- public int compareTo(NodeLoadReport o) {
- if (nRuntimes != o.getnRuntimes()) {
- return nRuntimes - o.getnRuntimes();
- } else {
- return (int) (this.cpuLoad - ((NodeLoadReport) o).cpuLoad);
- }
- }
-
- public float getCpuLoad() {
- return cpuLoad;
- }
-
- public void setCpuLoad(float cpuLoad) {
- this.cpuLoad = cpuLoad;
- }
-
- public double getUsedHeap() {
- return usedHeap;
- }
-
- public void setUsedHeap(double usedHeap) {
- this.usedHeap = usedHeap;
- }
-
- public int getnRuntimes() {
- return nRuntimes;
- }
-
- public void setnRuntimes(int nRuntimes) {
- this.nRuntimes = nRuntimes;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
deleted file mode 100644
index f651935..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NodeLoadReportService.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.OperatingSystemMXBean;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedMessageService;
-import org.apache.asterix.external.feed.api.IFeedService;
-import org.apache.asterix.external.feed.message.NodeReportMessage;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-
-public class NodeLoadReportService implements IFeedService {
-
- private static final int NODE_LOAD_REPORT_FREQUENCY = 2000;
- private static final float CPU_CHANGE_THRESHOLD = 0.2f;
- private static final float HEAP_CHANGE_THRESHOLD = 0.4f;
-
- private final NodeLoadReportTask task;
- private final Timer timer;
-
- public NodeLoadReportService(String nodeId, IFeedManager feedManager) {
- this.task = new NodeLoadReportTask(nodeId, feedManager);
- this.timer = new Timer();
- }
-
- @Override
- public void start() throws Exception {
- timer.schedule(task, 0, NODE_LOAD_REPORT_FREQUENCY);
- }
-
- @Override
- public void stop() {
- timer.cancel();
- }
-
- private static class NodeLoadReportTask extends TimerTask {
-
- private final IFeedManager feedManager;
- private final NodeReportMessage message;
- private final IFeedMessageService messageService;
-
- private static OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
- private static MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
-
- public NodeLoadReportTask(String nodeId, IFeedManager feedManager) {
- this.feedManager = feedManager;
- this.message = new NodeReportMessage(0.0f, 0L, 0);
- this.messageService = feedManager.getFeedMessageService();
- }
-
- @Override
- public void run() {
- List<FeedRuntimeId> runtimeIds = feedManager.getFeedConnectionManager().getRegisteredRuntimes();
- int nRuntimes = runtimeIds.size();
- double cpuLoad = getCpuLoad();
- double usedHeap = getUsedHeap();
- if (sendMessage(nRuntimes, cpuLoad, usedHeap)) {
- message.reset(cpuLoad, usedHeap, nRuntimes);
- messageService.sendMessage(message);
- }
- }
-
- private boolean sendMessage(int nRuntimes, double cpuLoad, double usedHeap) {
- if (message == null) {
- return true;
- }
-
- boolean changeInCpu = (Math.abs(cpuLoad - message.getCpuLoad())
- / message.getCpuLoad()) > CPU_CHANGE_THRESHOLD;
- boolean changeInUsedHeap = (Math.abs(usedHeap - message.getUsedHeap())
- / message.getUsedHeap()) > HEAP_CHANGE_THRESHOLD;
- boolean changeInRuntimeSize = nRuntimes != message.getnRuntimes();
- return changeInCpu || changeInUsedHeap || changeInRuntimeSize;
- }
-
- private double getCpuLoad() {
- return osBean.getSystemLoadAverage();
- }
-
- private double getUsedHeap() {
- return ((double) memBean.getHeapMemoryUsage().getUsed()) / memBean.getHeapMemoryUsage().getMax();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
deleted file mode 100644
index ec95371..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/Series.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
-
-public abstract class Series {
-
- protected final MetricType type;
- protected int runningSum;
-
- public Series(MetricType type) {
- this.type = type;
- }
-
- public abstract void addValue(int value);
-
- public int getRunningSum() {
- return runningSum;
- }
-
- public MetricType getType() {
- return type;
- }
-
- public abstract void reset();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
deleted file mode 100644
index 6182753..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesAvg.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
-
-public class SeriesAvg extends Series {
-
- private int count;
-
- public SeriesAvg() {
- super(MetricType.AVG);
- }
-
- public int getAvg() {
- return runningSum / count;
- }
-
- public synchronized void addValue(int value) {
- if (value < 0) {
- return;
- }
- runningSum += value;
- count++;
- }
-
- public void reset(){
- count = 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
deleted file mode 100644
index 91eea87..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/SeriesRate.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.MetricType;
-
-public class SeriesRate extends Series {
-
- private static final long REFRESH_MEASUREMENT = 5000; // 5 seconds
-
- private int rate;
- private Timer timer;
- private RateComputingTask task;
-
- public SeriesRate() {
- super(MetricType.RATE);
- begin();
- }
-
- public int getRate() {
- return rate;
- }
-
- public synchronized void addValue(int value) {
- if (value < 0) {
- return;
- }
- runningSum += value;
- }
-
- public void begin() {
- if (timer == null) {
- timer = new Timer();
- task = new RateComputingTask(this);
- timer.scheduleAtFixedRate(task, 0, REFRESH_MEASUREMENT);
- }
- }
-
- public void end() {
- if (timer != null) {
- timer.cancel();
- }
- }
-
- public void reset() {
- rate = 0;
- if (task != null) {
- task.reset();
- }
- }
-
- private class RateComputingTask extends TimerTask {
-
- private int lastMeasured = 0;
- private final SeriesRate series;
-
- public RateComputingTask(SeriesRate series) {
- this.series = series;
- }
-
- @Override
- public void run() {
- int currentValue = series.getRunningSum();
- rate = (int) (((currentValue - lastMeasured) * 1000) / REFRESH_MEASUREMENT);
- lastMeasured = currentValue;
- }
-
- public void reset() {
- lastMeasured = 0;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
deleted file mode 100644
index 9db930e..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.watch;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.feed.api.IExceptionHandler;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector;
-import org.apache.asterix.external.feed.api.IFrameEventCallback;
-import org.apache.asterix.external.feed.api.IFramePostProcessor;
-import org.apache.asterix.external.feed.api.IFramePreprocessor;
-import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
-import org.apache.asterix.external.feed.dataflow.StorageFrameHandler;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class StorageSideMonitoredBuffer extends MonitoredBuffer {
-
- private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000;
-
- private boolean ackingEnabled;
- private final boolean timeTrackingEnabled;
-
- public StorageSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
- IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
- IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
- FeedPolicyAccessor policyAccessor) {
- super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
- exceptionHandler, callback, nPartitions, policyAccessor);
- timeTrackingEnabled = policyAccessor.isTimeTrackingEnabled();
- ackingEnabled = policyAccessor.atleastOnceSemantics();
- if (ackingEnabled || timeTrackingEnabled) {
- storageFromeHandler = new StorageFrameHandler();
- this.storageTimeTrackingRateTask = new MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask(this,
- inputHandler.getFeedManager(), connectionId, runtimeId.getPartition(), policyAccessor,
- storageFromeHandler);
- this.timer.scheduleAtFixedRate(storageTimeTrackingRateTask, 0, STORAGE_TIME_TRACKING_FREQUENCY);
- }
- }
-
- @Override
- protected boolean monitorProcessingRate() {
- return false;
- }
-
- @Override
- protected boolean logInflowOutflowRate() {
- return true;
- }
-
- @Override
- public IFramePreprocessor getFramePreProcessor() {
- return new IFramePreprocessor() {
-
- @Override
- public void preProcess(ByteBuffer frame) {
- try {
- if (ackingEnabled) {
- storageFromeHandler.updateTrackingInformation(frame, inflowFta);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- }
-
- @Override
- protected IFramePostProcessor getFramePostProcessor() {
- return new IFramePostProcessor() {
-
- private static final long NORMAL_WINDOW_LIMIT = 400 * 1000;
- private static final long HIGH_WINDOW_LIMIT = 800 * 1000;
-
- private long delayNormalWindow = 0;
- private long delayHighWindow = 0;
- private long delayLowWindow = 0;
-
- private int countNormalWindow;
- private int countHighWindow;
- private int countLowWindow;
-
- private long beginIntakeTimestamp = 0;
-
- @Override
- public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
- if (ackingEnabled || timeTrackingEnabled) {
- int nTuples = frameAccessor.getTupleCount();
- long intakeTimestamp;
- long currentTime = System.currentTimeMillis();
- for (int i = 0; i < nTuples; i++) {
- int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
- int openPartOffsetOrig = frame.getInt(recordStart + 6);
- int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
-
- int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
- + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
-
- int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2)
- + 1;
-
- int intakeTimestampValueOffset = partitionOffset + 4
- + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2) + 1;
- intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
- if (beginIntakeTimestamp == 0) {
- beginIntakeTimestamp = intakeTimestamp;
- LOGGER.warning("Begin Timestamp: " + beginIntakeTimestamp);
- }
-
- updateRunningAvg(intakeTimestamp, currentTime);
-
- int storeTimestampValueOffset = intakeTimestampValueOffset + 8
- + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
- frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
- }
- logRunningAvg();
- resetRunningAvg();
- }
- }
-
- private void updateRunningAvg(long intakeTimestamp, long currentTime) {
- long diffTimestamp = intakeTimestamp - beginIntakeTimestamp;
- long delay = (currentTime - intakeTimestamp);
- if (diffTimestamp < NORMAL_WINDOW_LIMIT) {
- delayNormalWindow += delay;
- countNormalWindow++;
- } else if (diffTimestamp < HIGH_WINDOW_LIMIT) {
- delayHighWindow += delay;
- countHighWindow++;
- } else {
- delayLowWindow += delay;
- countLowWindow++;
- }
- }
-
- private void resetRunningAvg() {
- delayNormalWindow = 0;
- countNormalWindow = 0;
- delayHighWindow = 0;
- countHighWindow = 0;
- delayLowWindow = 0;
- countLowWindow = 0;
- }
-
- private void logRunningAvg() {
- if (countNormalWindow != 0 && delayNormalWindow != 0) {
- LOGGER.warning("Window:" + 0 + ":" + "Avg Travel_Time:" + (delayNormalWindow / countNormalWindow));
- }
- if (countHighWindow != 0 && delayHighWindow != 0) {
- LOGGER.warning("Window:" + 1 + ":" + "Avg Travel_Time:" + (delayHighWindow / countHighWindow));
- }
- if (countLowWindow != 0 && delayLowWindow != 0) {
- LOGGER.warning("Window:" + 2 + ":" + "Avg Travel_Time:" + (delayLowWindow / countLowWindow));
- }
- }
-
- };
- }
-
- public boolean isAckingEnabled() {
- return ackingEnabled;
- }
-
- public void setAcking(boolean ackingEnabled) {
- this.ackingEnabled = ackingEnabled;
- }
-
- public boolean isTimeTrackingEnabled() {
- return timeTrackingEnabled;
- }
-
- @Override
- protected boolean monitorInputQueueLength() {
- return true;
- }
-
- @Override
- protected boolean reportOutflowRate() {
- return true;
- }
-
- @Override
- protected boolean reportInflowRate() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index ce1d893..26f8654 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -116,14 +116,14 @@ public class SocketServerInputStream extends AsterixInputStream {
}
socket = null;
} catch (IOException e) {
- hde = ExternalDataExceptionUtils.suppress(hde, e);
+ hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, e);
}
try {
if (server != null) {
server.close();
}
} catch (IOException e) {
- hde = ExternalDataExceptionUtils.suppress(hde, e);
+ hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, e);
} finally {
server = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index 9485b77..36098ee 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -19,17 +19,14 @@
package org.apache.asterix.external.operators;
import java.util.Map;
-import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedManager;
import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
import org.apache.asterix.external.feed.api.ISubscribableRuntime;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.runtime.IngestionRuntime;
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.external.feed.management.FeedManager;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -48,7 +45,6 @@ import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescri
public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(FeedCollectOperatorDescriptor.class.getName());
/** The type associated with the ADM data output from (the feed adapter OR the compute operator) */
private final IAType outputType;
@@ -59,9 +55,6 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
/** Map representation of policy parameters */
private final Map<String, String> feedPolicyProperties;
- /** The (singleton) instance of {@code IFeedIngestionManager} **/
- private IFeedSubscriptionManager subscriptionManager;
-
/** The source feed from which the feed derives its data from. **/
private final FeedId sourceFeedId;
@@ -72,7 +65,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
FeedRuntimeType subscriptionLocation) {
super(spec, 0, 1);
- recordDescriptors[0] = rDesc;
+ this.recordDescriptors[0] = rDesc;
this.outputType = atype;
this.connectionId = feedConnectionId;
this.feedPolicyProperties = feedPolicyProperties;
@@ -84,22 +77,11 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- this.subscriptionManager = ((IFeedManager) runtimeCtx.getFeedManager()).getFeedSubscriptionManager();
- ISubscribableRuntime sourceRuntime = null;
- SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
- subscriptionLocation, partition);
- switch (subscriptionLocation) {
- case INTAKE:
- sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
- break;
- case COMPUTE:
- sourceRuntime = subscriptionManager.getSubscribableRuntime(feedSubscribableRuntimeId);
- break;
- default:
- throw new HyracksDataException("Can't subscirbe to FeedRuntime with Type: " + subscriptionLocation);
- }
+ FeedManager feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject()).getFeedManager();
+ FeedRuntimeId sourceRuntimeId =
+ new FeedRuntimeId(sourceFeedId, subscriptionLocation, partition, FeedRuntimeId.DEFAULT_TARGET_ID);
+ ISubscribableRuntime sourceRuntime = feedManager.getSubscribableRuntime(sourceRuntimeId);
return new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId, feedPolicyProperties, partition,
nPartitions, sourceRuntime);
}
@@ -124,10 +106,6 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato
return sourceFeedId;
}
- private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
- return (IngestionRuntime) subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
- }
-
public FeedRuntimeType getSubscriptionLocation() {
return subscriptionLocation;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 178d2d5..aeea6ba 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -19,29 +19,21 @@
package org.apache.asterix.external.operators;
import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.dataflow.CollectTransformFeedFrameWriter;
-import org.apache.asterix.external.feed.dataflow.FeedCollectRuntimeInputHandler;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.SyncFeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.management.FeedManager;
import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.CollectionRuntime;
import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -50,19 +42,14 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNod
* The first operator in a collect job in a feed.
*/
public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
- private static Logger LOGGER = Logger.getLogger(FeedCollectOperatorNodePushable.class.getName());
private final int partition;
private final FeedConnectionId connectionId;
private final Map<String, String> feedPolicy;
private final FeedPolicyAccessor policyAccessor;
- private final IFeedManager feedManager;
+ private final FeedManager feedManager;
private final ISubscribableRuntime sourceRuntime;
private final IHyracksTaskContext ctx;
- private final int nPartitions;
-
- private RecordDescriptor outputRecordDescriptor;
- private FeedRuntimeInputHandler inputSideHandler;
private CollectionRuntime collectRuntime;
public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedId sourceFeedId,
@@ -70,139 +57,38 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
ISubscribableRuntime sourceRuntime) {
this.ctx = ctx;
this.partition = partition;
- this.nPartitions = nPartitions;
this.connectionId = feedConnectionId;
this.sourceRuntime = sourceRuntime;
this.feedPolicy = feedPolicy;
this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
- this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+ this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
.getApplicationObject()).getFeedManager();
}
@Override
public void initialize() throws HyracksDataException {
try {
- outputRecordDescriptor = recordDesc;
- switch (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType()) {
- case INTAKE:
- handleCompleteConnection();
- // Notify CC that Collection started
- ctx.sendApplicationMessageToCC(
- new FeedPartitionStartMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId()),
- null);
- break;
- case COMPUTE:
- handlePartialConnection();
- break;
- default:
- throw new IllegalStateException("Invalid source type "
- + ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType());
- }
- State state = collectRuntime.waitTillCollectionOver();
- if (state.equals(State.FINISHED)) {
- feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
- collectRuntime.getRuntimeId());
- writer.close();
- inputSideHandler.close();
- } else if (state.equals(State.HANDOVER)) {
- inputSideHandler.setMode(Mode.STALL);
- writer.close();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ending Collect Operator, the input side handler is now in " + Mode.STALL
- + " and the output writer " + writer + " has been closed ");
- }
+ FeedRuntimeId runtimeId = new FeedRuntimeId(connectionId.getFeedId(), FeedRuntimeType.COLLECT, partition,
+ FeedRuntimeId.DEFAULT_TARGET_ID);
+ // Does this collector have a handler?
+ FrameTupleAccessor tAccessor = new FrameTupleAccessor(recordDesc);
+ if (policyAccessor.bufferingEnabled()) {
+ writer = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, policyAccessor, tAccessor,
+ feedManager.getFeedMemoryManager());
+ } else {
+ writer = new SyncFeedRuntimeInputHandler(ctx, writer, tAccessor);
}
- } catch (InterruptedException ie) {
- handleInterruptedException(ie);
+ collectRuntime = new CollectionRuntime(connectionId, runtimeId, sourceRuntime, feedPolicy, ctx,
+ new FeedFrameCollector(policyAccessor, writer, connectionId));
+ feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+ sourceRuntime.subscribe(collectRuntime);
+ // Notify CC that Collection started
+ ctx.sendApplicationMessageToCC(
+ new FeedPartitionStartMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId()), null);
+ collectRuntime.waitTillCollectionOver();
+ feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
} catch (Exception e) {
- e.printStackTrace();
throw new HyracksDataException(e);
}
}
-
- private void handleCompleteConnection() throws Exception {
- FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COLLECT, partition,
- FeedRuntimeId.DEFAULT_OPERAND_ID);
- collectRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(connectionId,
- runtimeId);
- if (collectRuntime == null) {
- beginNewFeed(runtimeId);
- } else {
- reviveOldFeed();
- }
- }
-
- private void beginNewFeed(FeedRuntimeId runtimeId) throws Exception {
- writer.open();
- IFrameWriter outputSideWriter = writer;
- if (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType()
- .equals(FeedRuntimeType.COMPUTE)) {
- outputSideWriter = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime, outputRecordDescriptor,
- connectionId);
- this.recordDesc = sourceRuntime.getRecordDescriptor();
- }
-
- FrameTupleAccessor tupleAccessor = new FrameTupleAccessor(recordDesc);
- inputSideHandler = new FeedCollectRuntimeInputHandler(ctx, connectionId, runtimeId, outputSideWriter,
- policyAccessor, false, tupleAccessor, recordDesc, feedManager, nPartitions);
-
- collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, outputSideWriter,
- sourceRuntime, feedPolicy, ctx);
- feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
- sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
- }
-
- private void reviveOldFeed() throws HyracksDataException {
- writer.open();
- collectRuntime.getFrameCollector().setState(State.ACTIVE);
- inputSideHandler = collectRuntime.getInputHandler();
-
- IFrameWriter innerWriter = inputSideHandler.getCoreOperator();
- if (innerWriter instanceof CollectTransformFeedFrameWriter) {
- ((CollectTransformFeedFrameWriter) innerWriter).reset(this.writer);
- } else {
- inputSideHandler.setCoreOperator(writer);
- }
-
- inputSideHandler.setMode(Mode.PROCESS);
- }
-
- private void handlePartialConnection() throws Exception {
- FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COMPUTE_COLLECT, partition,
- FeedRuntimeId.DEFAULT_OPERAND_ID);
- writer.open();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Beginning new feed (from existing partial connection):" + connectionId);
- }
- IFeedOperatorOutputSideHandler wrapper = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime,
- outputRecordDescriptor, connectionId);
-
- inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, wrapper, policyAccessor, false,
- new FrameTupleAccessor(recordDesc), recordDesc, feedManager, nPartitions);
-
- collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, wrapper, sourceRuntime,
- feedPolicy, ctx);
- feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
- recordDesc = sourceRuntime.getRecordDescriptor();
- sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
- }
-
- private void handleInterruptedException(InterruptedException ie) throws HyracksDataException {
- if (policyAccessor.continueOnHardwareFailure()) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Continuing on failure as per feed policy, switching to " + Mode.STALL
- + " until failure is resolved");
- }
- inputSideHandler.setMode(Mode.STALL);
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Failure during feed ingestion. Deregistering feed runtime " + collectRuntime
- + " as feed is not configured to handle failures");
- }
- feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
- writer.close();
- throw new HyracksDataException(ie);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index db11caa..b1fd7a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -21,16 +21,10 @@ package org.apache.asterix.external.operators;
import java.util.Map;
import java.util.logging.Logger;
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.feed.api.IFeed;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.IngestionRuntime;
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -99,27 +93,18 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
- IFeedSubscriptionManager feedSubscriptionManager = ((IFeedManager) runtimeCtx.getFeedManager())
- .getFeedSubscriptionManager();
- SubscribableFeedRuntimeId feedIngestionId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
- partition);
- IngestionRuntime ingestionRuntime = (IngestionRuntime) feedSubscriptionManager
- .getSubscribableRuntime(feedIngestionId);
if (adaptorFactory == null) {
try {
- adaptorFactory = createExtenralAdapterFactory(ctx, partition);
+ adaptorFactory = createExternalAdapterFactory(ctx, partition);
} catch (Exception exception) {
throw new HyracksDataException(exception);
}
-
}
- return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, ingestionRuntime,
- policyAccessor, recordDescProvider, this);
+ return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, policyAccessor,
+ recordDescProvider, this);
}
- private IAdapterFactory createExtenralAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception {
+ private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception {
IAdapterFactory adapterFactory = null;
ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
adaptorLibraryName);