You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:11 UTC
[14/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitAckMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitAckMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitAckMessage.java
new file mode 100644
index 0000000..1130905
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitAckMessage.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.message.FeedMessage;
+
+public class FeedTupleCommitAckMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+ private int intakePartition;
+ private int base;
+ private byte[] commitAcks;
+
+ public FeedTupleCommitAckMessage(FeedConnectionId connectionId, int intakePartition, int base, byte[] commitAcks) {
+ super(MessageType.COMMIT_ACK);
+ this.connectionId = connectionId;
+ this.intakePartition = intakePartition;
+ this.base = base;
+ this.commitAcks = commitAcks;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+ obj.put(FeedConstants.MessageConstants.BASE, base);
+ String commitAcksString = DatatypeConverter.printBase64Binary(commitAcks);
+ obj.put(FeedConstants.MessageConstants.COMMIT_ACKS, commitAcksString);
+ return obj;
+ }
+
+ public static FeedTupleCommitAckMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
+ int base = obj.getInt(FeedConstants.MessageConstants.BASE);
+ String commitAcksString = obj.getString(FeedConstants.MessageConstants.COMMIT_ACKS);
+ byte[] commitAcks = DatatypeConverter.parseBase64Binary(commitAcksString);
+ return new FeedTupleCommitAckMessage(connectionId, intakePartition, base, commitAcks);
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public int getIntakePartition() {
+ return intakePartition;
+ }
+
+ public byte[] getCommitAcks() {
+ return commitAcks;
+ }
+
+ public void reset(int intakePartition, int base, byte[] commitAcks) {
+ this.intakePartition = intakePartition;
+ this.base = base;
+ this.commitAcks = commitAcks;
+ }
+
+ public int getBase() {
+ return base;
+ }
+
+ public void setBase(int base) {
+ this.base = base;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitResponseMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitResponseMessage.java
new file mode 100644
index 0000000..b861e63
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedTupleCommitResponseMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.message.FeedMessage;
+
+public class FeedTupleCommitResponseMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+ private final int intakePartition;
+ private final int maxWindowAcked;
+
+ public FeedTupleCommitResponseMessage(FeedConnectionId connectionId, int intakePartition, int maxWindowAcked) {
+ super(MessageType.COMMIT_ACK_RESPONSE);
+ this.connectionId = connectionId;
+ this.intakePartition = intakePartition;
+ this.maxWindowAcked = maxWindowAcked;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+ obj.put(FeedConstants.MessageConstants.MAX_WINDOW_ACKED, maxWindowAcked);
+ return obj;
+ }
+
+ @Override
+ public String toString() {
+ return connectionId + "[" + intakePartition + "]" + "(" + maxWindowAcked + ")";
+ }
+
+ public static FeedTupleCommitResponseMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
+ int maxWindowAcked = obj.getInt(FeedConstants.MessageConstants.MAX_WINDOW_ACKED);
+ return new FeedTupleCommitResponseMessage(connectionId, intakePartition, maxWindowAcked);
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public int getMaxWindowAcked() {
+ return maxWindowAcked;
+ }
+
+ public int getIntakePartition() {
+ return intakePartition;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameCollection.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameCollection.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameCollection.java
new file mode 100644
index 0000000..611f613
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameCollection.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+
+/**
+ * Represents an expandable collection of frames.
+ */
+public class FrameCollection implements IFeedMemoryComponent {
+
+ /** A unique identifier for the feed memory component **/
+ private final int componentId;
+
+ /** A collection of frames (each being a ByteBuffer) **/
+ private final List<ByteBuffer> frames = new LinkedList<ByteBuffer>();
+
+ /** The permitted maximum size, the collection may grow to **/
+ private int maxSize;
+
+ /** The {@link IFeedMemoryManager} for the NodeController **/
+ private final IFeedMemoryManager memoryManager;
+
+ public FrameCollection(int componentId, IFeedMemoryManager memoryManager, int maxSize) {
+ this.componentId = componentId;
+ this.maxSize = maxSize;
+ this.memoryManager = memoryManager;
+ }
+
+ public boolean addFrame(ByteBuffer frame) {
+ if (frames.size() == maxSize) {
+ boolean expansionGranted = memoryManager.expandMemoryComponent(this);
+ if (!expansionGranted) {
+ return false;
+ }
+ }
+ ByteBuffer storageBuffer = ByteBuffer.allocate(frame.capacity());
+ storageBuffer.put(frame);
+ frames.add(storageBuffer);
+ storageBuffer.flip();
+ return true;
+ }
+
+ public Iterator<ByteBuffer> getFrameCollectionIterator() {
+ return frames.iterator();
+ }
+
+ @Override
+ public int getTotalAllocation() {
+ return frames.size();
+ }
+
+ @Override
+ public Type getType() {
+ return Type.COLLECTION;
+ }
+
+ @Override
+ public int getComponentId() {
+ return componentId;
+ }
+
+ @Override
+ public void expand(int delta) {
+ maxSize = maxSize + delta;
+ }
+
+ @Override
+ public void reset() {
+ frames.clear();
+ maxSize = IFeedMemoryManager.START_COLLECTION_SIZE;
+ }
+
+ @Override
+ public String toString() {
+ return "FrameCollection" + "[" + componentId + "]" + "(" + frames.size() + "/" + maxSize + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameDistributor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameDistributor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameDistributor.java
new file mode 100644
index 0000000..937e8f8
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameDistributor.java
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent.Type;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FrameDistributor {
+
+ private static final Logger LOGGER = Logger.getLogger(FrameDistributor.class.getName());
+
+ private static final long MEMORY_AVAILABLE_POLL_PERIOD = 1000; // 1 second
+
+ private final IHyracksTaskContext ctx;
+ private final FeedId feedId;
+ private final FeedRuntimeType feedRuntimeType;
+ private final int partition;
+ private final IFeedMemoryManager memoryManager;
+ private final boolean enableSynchronousTransfer;
+ /** A map storing the registered frame readers ({@code FeedFrameCollector}. **/
+ private final Map<IFrameWriter, FeedFrameCollector> registeredCollectors;
+ private final FrameTupleAccessor fta;
+
+ private DataBucketPool pool;
+ private DistributionMode distributionMode;
+ private boolean spillToDiskRequired = false;
+
+ public enum DistributionMode {
+ /**
+ * A single feed frame collector is registered for receiving tuples.
+ * Tuple is sent via synchronous call, that is no buffering is involved
+ **/
+ SINGLE,
+
+ /**
+ * Multiple feed frame collectors are concurrently registered for
+ * receiving tuples.
+ **/
+ SHARED,
+
+ /**
+ * Feed tuples are not being processed, irrespective of # of registered
+ * feed frame collectors.
+ **/
+ INACTIVE
+ }
+
+ public FrameDistributor(IHyracksTaskContext ctx, FeedId feedId, FeedRuntimeType feedRuntimeType, int partition,
+ boolean enableSynchronousTransfer, IFeedMemoryManager memoryManager, FrameTupleAccessor fta)
+ throws HyracksDataException {
+ this.ctx = ctx;
+ this.feedId = feedId;
+ this.feedRuntimeType = feedRuntimeType;
+ this.partition = partition;
+ this.memoryManager = memoryManager;
+ this.enableSynchronousTransfer = enableSynchronousTransfer;
+ this.registeredCollectors = new HashMap<IFrameWriter, FeedFrameCollector>();
+ this.distributionMode = DistributionMode.INACTIVE;
+ this.fta = fta;
+ }
+
+ public void notifyEndOfFeed() {
+ DataBucket bucket = getDataBucket();
+ if (bucket != null) {
+ sendEndOfFeedDataBucket(bucket);
+ } else {
+ while (bucket == null) {
+ try {
+ Thread.sleep(MEMORY_AVAILABLE_POLL_PERIOD);
+ bucket = getDataBucket();
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ if (bucket != null) {
+ sendEndOfFeedDataBucket(bucket);
+ }
+ }
+ }
+
+ private void sendEndOfFeedDataBucket(DataBucket bucket) {
+ bucket.setContentType(DataBucket.ContentType.EOD);
+ nextBucket(bucket);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("End of feed data packet sent " + this.feedId);
+ }
+ }
+
+ public synchronized void registerFrameCollector(FeedFrameCollector frameCollector) {
+ DistributionMode currentMode = distributionMode;
+ switch (distributionMode) {
+ case INACTIVE:
+ if (!enableSynchronousTransfer) {
+ pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
+ frameCollector.start();
+ }
+ registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+ setMode(DistributionMode.SINGLE);
+ break;
+ case SINGLE:
+ pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
+ registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+ for (FeedFrameCollector reader : registeredCollectors.values()) {
+ reader.start();
+ }
+ setMode(DistributionMode.SHARED);
+ break;
+ case SHARED:
+ frameCollector.start();
+ registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+ break;
+ }
+ evaluateIfSpillIsEnabled();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Switching to " + distributionMode + " mode from " + currentMode + " mode " + " Feed id "
+ + feedId);
+ }
+ }
+
+ public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector) {
+ switch (distributionMode) {
+ case INACTIVE:
+ throw new IllegalStateException("Invalid attempt to deregister frame collector in " + distributionMode
+ + " mode.");
+ case SHARED:
+ frameCollector.closeCollector();
+ registeredCollectors.remove(frameCollector.getFrameWriter());
+ int nCollectors = registeredCollectors.size();
+ if (nCollectors == 1) {
+ FeedFrameCollector loneCollector = registeredCollectors.values().iterator().next();
+ setMode(DistributionMode.SINGLE);
+ loneCollector.setState(FeedFrameCollector.State.TRANSITION);
+ loneCollector.closeCollector();
+ memoryManager.releaseMemoryComponent(pool);
+ evaluateIfSpillIsEnabled();
+ } else {
+ if (!spillToDiskRequired) {
+ evaluateIfSpillIsEnabled();
+ }
+ }
+ break;
+ case SINGLE:
+ frameCollector.closeCollector();
+ setMode(DistributionMode.INACTIVE);
+ spillToDiskRequired = false;
+ break;
+
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Deregistered frame reader" + frameCollector + " from feed distributor for " + feedId);
+ }
+ }
+
+ public void evaluateIfSpillIsEnabled() {
+ if (!spillToDiskRequired) {
+ for (FeedFrameCollector collector : registeredCollectors.values()) {
+ spillToDiskRequired = spillToDiskRequired
+ || collector.getFeedPolicyAccessor().spillToDiskOnCongestion();
+ if (spillToDiskRequired) {
+ break;
+ }
+ }
+ }
+ }
+
+ public boolean deregisterFrameCollector(IFrameWriter frameWriter) {
+ FeedFrameCollector collector = registeredCollectors.get(frameWriter);
+ if (collector != null) {
+ deregisterFrameCollector(collector);
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized void setMode(DistributionMode mode) {
+ this.distributionMode = mode;
+ }
+
+ public boolean isRegistered(IFrameWriter writer) {
+ return registeredCollectors.get(writer) != null;
+ }
+
+ public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ switch (distributionMode) {
+ case INACTIVE:
+ break;
+ case SINGLE:
+ FeedFrameCollector collector = registeredCollectors.values().iterator().next();
+ switch (collector.getState()) {
+ case HANDOVER:
+ case ACTIVE:
+ if (enableSynchronousTransfer) {
+ collector.nextFrame(frame); // processing is synchronous
+ } else {
+ handleDataBucket(frame);
+ }
+ break;
+ case TRANSITION:
+ handleDataBucket(frame);
+ break;
+ case FINISHED:
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Discarding fetched tuples, feed has ended [" + registeredCollectors.get(0)
+ + "]" + " Feed Id " + feedId + " frame distributor " + this.getFeedRuntimeType());
+ }
+ registeredCollectors.remove(0);
+ break;
+ }
+ break;
+ case SHARED:
+ handleDataBucket(frame);
+ break;
+ }
+ }
+
+ private void nextBucket(DataBucket bucket) {
+ for (FeedFrameCollector collector : registeredCollectors.values()) {
+ collector.sendMessage(bucket); // asynchronous call
+ }
+ }
+
+ private void handleDataBucket(ByteBuffer frame) throws HyracksDataException {
+ DataBucket bucket = getDataBucket();
+ if (bucket == null) {
+ handleFrameDuringMemoryCongestion(frame);
+ } else {
+ bucket.reset(frame);
+ bucket.setDesiredReadCount(registeredCollectors.size());
+ nextBucket(bucket);
+ }
+ }
+
+ private void handleFrameDuringMemoryCongestion(ByteBuffer frame) throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to allocate memory, will evaluate the need to spill");
+ }
+ // wait till memory is available
+ }
+
+ private DataBucket getDataBucket() {
+ DataBucket bucket = null;
+ if (pool != null) {
+ bucket = pool.getDataBucket();
+ if (bucket != null) {
+ bucket.setDesiredReadCount(registeredCollectors.size());
+ return bucket;
+ } else {
+ return null;
+ }
+ }
+ return null;
+ }
+
+ public DistributionMode getMode() {
+ return distributionMode;
+ }
+
+ public void close() {
+ switch (distributionMode) {
+ case INACTIVE:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("FrameDistributor is " + distributionMode);
+ }
+ break;
+ case SINGLE:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Disconnecting single frame reader in " + distributionMode + " mode " + " for feedId "
+ + feedId + " " + this.feedRuntimeType);
+ }
+ setMode(DistributionMode.INACTIVE);
+ if (!enableSynchronousTransfer) {
+ notifyEndOfFeed(); // send EOD Data Bucket
+ waitForCollectorsToFinish();
+ }
+ registeredCollectors.values().iterator().next().disconnect();
+ break;
+ case SHARED:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Signalling End Of Feed; currently operating in " + distributionMode + " mode");
+ }
+ notifyEndOfFeed(); // send EOD Data Bucket
+ waitForCollectorsToFinish();
+ break;
+ }
+ }
+
+ private void waitForCollectorsToFinish() {
+ synchronized (registeredCollectors.values()) {
+ while (!allCollectorsFinished()) {
+ try {
+ registeredCollectors.values().wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private boolean allCollectorsFinished() {
+ boolean allFinished = true;
+ for (FeedFrameCollector collector : registeredCollectors.values()) {
+ allFinished = allFinished && collector.getState().equals(FeedFrameCollector.State.FINISHED);
+ }
+ return allFinished;
+ }
+
+ public Collection<FeedFrameCollector> getRegisteredCollectors() {
+ return registeredCollectors.values();
+ }
+
+ public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
+ return registeredCollectors;
+ }
+
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+ public DistributionMode getDistributionMode() {
+ return distributionMode;
+ }
+
+ public FeedRuntimeType getFeedRuntimeType() {
+ return feedRuntimeType;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public FrameTupleAccessor getFta() {
+ return fta;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameEventCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameEventCallback.java
new file mode 100644
index 0000000..780a332
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FrameEventCallback.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameEventCallback implements IFrameEventCallback {
+
+ private static final Logger LOGGER = Logger.getLogger(FrameEventCallback.class.getName());
+
+ private final FeedPolicyAccessor fpa;
+ private final FeedRuntimeInputHandler inputSideHandler;
+ private IFrameWriter coreOperator;
+
+ public FrameEventCallback(FeedPolicyAccessor fpa, FeedRuntimeInputHandler inputSideHandler,
+ IFrameWriter coreOperator) {
+ this.fpa = fpa;
+ this.inputSideHandler = inputSideHandler;
+ this.coreOperator = coreOperator;
+ }
+
+ @Override
+ public void frameEvent(FrameEvent event) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Frame Event for " + inputSideHandler.getRuntimeId() + " " + event);
+ }
+ if (!event.equals(FrameEvent.FINISHED_PROCESSING_SPILLAGE)
+ && inputSideHandler.getMode().equals(Mode.PROCESS_SPILL)) {
+ return;
+ }
+ switch (event) {
+ case PENDING_WORK_THRESHOLD_REACHED:
+ if (fpa.spillToDiskOnCongestion()) {
+ inputSideHandler.setMode(Mode.SPILL);
+ } else if (fpa.discardOnCongestion()) {
+ inputSideHandler.setMode(Mode.DISCARD);
+ } else if (fpa.throttlingEnabled()) {
+ inputSideHandler.setThrottlingEnabled(true);
+ } else {
+ try {
+ inputSideHandler.reportUnresolvableCongestion();
+ } catch (HyracksDataException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to report congestion!!!");
+ }
+ }
+ }
+ break;
+ case FINISHED_PROCESSING:
+ inputSideHandler.setFinished(true);
+ synchronized (coreOperator) {
+ coreOperator.notifyAll();
+ }
+ break;
+ case PENDING_WORK_DONE:
+ switch (inputSideHandler.getMode()) {
+ case SPILL:
+ case DISCARD:
+ case POST_SPILL_DISCARD:
+ inputSideHandler.setMode(Mode.PROCESS);
+ break;
+ default:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Received " + event + " ignoring as operating in " + inputSideHandler.getMode());
+ }
+ }
+ break;
+ case FINISHED_PROCESSING_SPILLAGE:
+ inputSideHandler.setMode(Mode.PROCESS);
+ break;
+ default:
+ break;
+ }
+ }
+
+ public void setCoreOperator(IFrameWriter coreOperator) {
+ this.coreOperator = coreOperator;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFeedManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFeedManager.java
deleted file mode 100644
index 6cdc45c..0000000
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFeedManager.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.common.feeds;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
-
-/**
- * Handle (de)registration of feeds for delivery of control messages.
- */
-public interface IFeedManager {
-
- public static final long SOCKET_CONNECT_TIMEOUT = 5000;
-
- /**
- * Returns the executor service associated with the feed.
- *
- * @param feedId
- * @return
- */
- public ExecutorService getFeedExecutorService(FeedConnectionId feedId);
-
- /**
- * Allows registration of a feedRuntime.
- *
- * @param feedRuntime
- * @throws Exception
- */
- public void registerFeedRuntime(FeedRuntime feedRuntime) throws Exception;
-
- /**
- * Allows de-registration of a feed runtime.
- *
- * @param feedRuntimeId
- */
- public void deRegisterFeedRuntime(FeedRuntimeId feedRuntimeId);
-
- /**
- * Obtain feed runtime corresponding to a feedRuntimeId
- *
- * @param feedRuntimeId
- * @return
- */
- public FeedRuntime getFeedRuntime(FeedRuntimeId feedRuntimeId);
-
- /**
- * Register the Super Feed Manager associated witht a feed.
- *
- * @param feedId
- * @param sfm
- * @throws Exception
- */
- public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) throws Exception;
-
- /**
- * Obtain a handle to the Super Feed Manager associated with the feed.
- *
- * @param feedId
- * @return
- */
- public SuperFeedManager getSuperFeedManager(FeedConnectionId feedId);
-
- /**
- * De-register a feed
- *
- * @param feedId
- * @throws IOException
- */
- void deregisterFeed(FeedConnectionId feedId);
-
- /**
- * Obtain the feed runtime manager associated with a feed.
- *
- * @param feedId
- * @return
- */
- public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId);
-
- /**
- * Obtain a handle to the feed Message service associated with a feed.
- *
- * @param feedId
- * @return
- */
- public FeedMessageService getFeedMessageService(FeedConnectionId feedId);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePostProcessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePostProcessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePostProcessor.java
new file mode 100644
index 0000000..3bda2db
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePostProcessor.java
@@ -0,0 +1,10 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public interface IFramePostProcessor {
+
+ public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor);
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePreprocessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePreprocessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePreprocessor.java
new file mode 100644
index 0000000..468c13c
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IFramePreprocessor.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+
+public interface IFramePreprocessor {
+
+ public void preProcess(ByteBuffer frame) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IngestionRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IngestionRuntime.java
new file mode 100644
index 0000000..b7eb3b8
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IngestionRuntime.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.logging.Level;
+
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class IngestionRuntime extends SubscribableRuntime {
+
+ private final IAdapterRuntimeManager adapterRuntimeManager;
+
+ public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
+ RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager) {
+ super(feedId, runtimeId, null, feedWriter, recordDesc);
+ this.adapterRuntimeManager = adaptorRuntimeManager;
+ }
+
+ public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception {
+ FeedFrameCollector reader = dWriter.subscribeFeed(fpa, collectionRuntime.getInputHandler(),
+ collectionRuntime.getConnectionId());
+ collectionRuntime.setFrameCollector(reader);
+
+ if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
+ adapterRuntimeManager.start();
+ }
+ subscribers.add(collectionRuntime);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this);
+ }
+ }
+
+ public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
+ dWriter.unsubscribeFeed(collectionRuntime.getInputHandler());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Unsubscribed feed collection [" + collectionRuntime + "] from " + this);
+ }
+ if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.INACTIVE)) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Stopping adapter for " + this + " as no more registered collectors");
+ }
+ adapterRuntimeManager.stop();
+ }
+ subscribers.remove(collectionRuntime);
+ }
+
+ public void endOfFeed() {
+ dWriter.notifyEndOfFeed();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Notified End Of Feed [" + this + "]");
+ }
+ }
+
+ public IAdapterRuntimeManager getAdapterRuntimeManager() {
+ return adapterRuntimeManager;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakePartitionStatistics.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakePartitionStatistics.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakePartitionStatistics.java
new file mode 100644
index 0000000..656797e
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakePartitionStatistics.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.BitSet;
+
+public class IntakePartitionStatistics {
+
+ public static int ACK_WINDOW_SIZE = 1024;
+ private int partition;
+ private int base;
+ private BitSet bitSet;
+
+ public IntakePartitionStatistics(int partition, int base) {
+ this.partition = partition;
+ this.base = base;
+ this.bitSet = new BitSet(ACK_WINDOW_SIZE);
+ }
+
+ public void ackRecordId(int recordId) {
+ int posIndexWithinBase = recordId % ACK_WINDOW_SIZE;
+ this.bitSet.set(posIndexWithinBase);
+ }
+
+ public byte[] getAckInfo() {
+ return bitSet.toByteArray();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakeSideMonitoredBuffer.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakeSideMonitoredBuffer.java
new file mode 100644
index 0000000..fdd6ec4
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/IntakeSideMonitoredBuffer.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class IntakeSideMonitoredBuffer extends MonitoredBuffer {
+
+ public IntakeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+ FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+ FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+ IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+ super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+ exceptionHandler, callback, nPartitions, policyAccessor);
+ }
+
+ @Override
+ protected boolean monitorProcessingRate() {
+ return false;
+ }
+
+ @Override
+ protected boolean logInflowOutflowRate() {
+ return false;
+ }
+
+ @Override
+ protected IFramePreprocessor getFramePreProcessor() {
+ return null;
+ }
+
+ @Override
+ protected IFramePostProcessor getFramePostProcessor() {
+ return null;
+ }
+
+ @Override
+ protected boolean monitorInputQueueLength() {
+ return false;
+ }
+
+ @Override
+ protected boolean reportOutflowRate() {
+ return false;
+ }
+
+ @Override
+ protected boolean reportInflowRate() {
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageListener.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageListener.java
index 7beb212..2e21ea7 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageListener.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageListener.java
@@ -43,15 +43,12 @@ public class MessageListener {
public void stop() {
listenerServer.stop();
- System.out.println("STOPPED MESSAGE RECEIVING SERVICE AT " + port);
if (!executorService.isShutdown()) {
executorService.shutdownNow();
}
-
}
public void start() throws IOException {
- System.out.println("STARTING MESSAGE RECEIVING SERVICE AT " + port);
listenerServer = new MessageListenerServer(port, outbox);
executorService.execute(listenerServer);
}
@@ -62,6 +59,8 @@ public class MessageListener {
private final LinkedBlockingQueue<String> outbox;
private ServerSocket server;
+ private static final char EOL = (char) "\n".getBytes()[0];
+
public MessageListenerServer(int port, LinkedBlockingQueue<String> outbox) {
this.port = port;
this.outbox = outbox;
@@ -77,7 +76,6 @@ public class MessageListener {
@Override
public void run() {
- char EOL = (char) "\n".getBytes()[0];
Socket client = null;
try {
server = new ServerSocket(port);
@@ -121,59 +119,4 @@ public class MessageListener {
}
- private static class MessageParser implements Runnable {
-
- private Socket client;
- private IMessageAnalyzer messageAnalyzer;
- private static final char EOL = (char) "\n".getBytes()[0];
-
- public MessageParser(Socket client, IMessageAnalyzer messageAnalyzer) {
- this.client = client;
- this.messageAnalyzer = messageAnalyzer;
- }
-
- @Override
- public void run() {
- CharBuffer buffer = CharBuffer.allocate(5000);
- char ch;
- try {
- InputStream in = client.getInputStream();
- while (true) {
- ch = (char) in.read();
- if (((int) ch) == -1) {
- break;
- }
- while (ch != EOL) {
- buffer.put(ch);
- ch = (char) in.read();
- }
- buffer.flip();
- String s = new String(buffer.array());
- synchronized (messageAnalyzer) {
- messageAnalyzer.getMessageQueue().add(s + "\n");
- }
- buffer.position(0);
- buffer.limit(5000);
- }
- } catch (IOException ioe) {
- ioe.printStackTrace();
- } finally {
- try {
- client.close();
- } catch (IOException ioe) {
- // do nothing
- }
- }
- }
- }
-
- public static interface IMessageAnalyzer {
-
- /**
- * @return
- */
- public LinkedBlockingQueue<String> getMessageQueue();
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageReceiver.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageReceiver.java
new file mode 100644
index 0000000..2cb1066
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MessageReceiver.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IMessageReceiver;
+
+public abstract class MessageReceiver<T> implements IMessageReceiver<T> {
+
+ protected static final Logger LOGGER = Logger.getLogger(MessageReceiver.class.getName());
+
+ protected final LinkedBlockingQueue<T> inbox;
+ protected ExecutorService executor;
+
+ public MessageReceiver() {
+ inbox = new LinkedBlockingQueue<T>();
+ }
+
+ public abstract void processMessage(T message) throws Exception;
+
+ @Override
+ public void start() {
+ executor = Executors.newSingleThreadExecutor();
+ executor.execute(new MessageReceiverRunnable<T>(this));
+ }
+
+ @Override
+ public synchronized void sendMessage(T message) {
+ inbox.add(message);
+ }
+
+ @Override
+ public void close(boolean processPending) {
+ if (executor != null) {
+ executor.shutdown();
+ executor = null;
+ if (processPending) {
+ flushPendingMessages();
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Will discard the pending frames " + inbox.size());
+ }
+ }
+ }
+ }
+
+ private static class MessageReceiverRunnable<T> implements Runnable {
+
+ private final LinkedBlockingQueue<T> inbox;
+ private final MessageReceiver<T> messageReceiver;
+
+ public MessageReceiverRunnable(MessageReceiver<T> messageReceiver) {
+ this.inbox = messageReceiver.inbox;
+ this.messageReceiver = messageReceiver;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ T message = inbox.take();
+ messageReceiver.processMessage(message);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ protected void flushPendingMessages() {
+ while (!inbox.isEmpty()) {
+ T message = null;
+ try {
+ message = inbox.take();
+ processMessage(message);
+ } catch (InterruptedException ie) {
+ // ignore exception but break from the loop
+ break;
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception " + e + " in processing message " + message);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBuffer.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBuffer.java
new file mode 100644
index 0000000..90e340f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBuffer.java
@@ -0,0 +1,386 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.logging.Level;
+
+import edu.uci.ics.asterix.common.feeds.MonitoredBufferTimerTasks.LogInputOutputRateTask;
+import edu.uci.ics.asterix.common.feeds.MonitoredBufferTimerTasks.MonitorInputQueueLengthTimerTask;
+import edu.uci.ics.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoreProcessRateTimerTask;
+import edu.uci.ics.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector.MetricType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector.ValueType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback.FrameEvent;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public abstract class MonitoredBuffer extends MessageReceiver<DataBucket> {
+
+ protected static final long LOG_INPUT_OUTPUT_RATE_FREQUENCY = 5000; // 5 seconds
+ protected static final long INPUT_QUEUE_MEASURE_FREQUENCY = 1000; // 1 second
+ protected static final long PROCESSING_RATE_MEASURE_FREQUENCY = 10000; // 10 seconds
+
+ protected static final int PROCESS_RATE_REFRESH = 2; // refresh processing rate every 10th frame
+
+ protected final IHyracksTaskContext ctx;
+ protected final FeedConnectionId connectionId;
+ protected final FeedRuntimeId runtimeId;
+ protected final FrameTupleAccessor inflowFta;
+ protected final FrameTupleAccessor outflowFta;
+ protected final FeedRuntimeInputHandler inputHandler;
+ protected final IFrameEventCallback callback;
+ protected final Timer timer;
+ private final RecordDescriptor recordDesc;
+ private final IExceptionHandler exceptionHandler;
+ protected final FeedPolicyAccessor policyAccessor;
+ protected int nPartitions;
+
+ private IFrameWriter frameWriter;
+ protected IFeedMetricCollector metricCollector;
+ protected boolean monitorProcessingRate = false;
+ protected boolean monitorInputQueueLength = false;
+ protected boolean logInflowOutflowRate = false;
+ protected boolean reportOutflowRate = false;
+ protected boolean reportInflowRate = false;
+
+ protected int inflowReportSenderId = -1;
+ protected int outflowReportSenderId = -1;
+ protected TimerTask monitorInputQueueLengthTask;
+ protected TimerTask processingRateTask;
+ protected TimerTask logInflowOutflowRateTask;
+ protected MonitoredBufferStorageTimerTask storageTimeTrackingRateTask;
+ protected StorageFrameHandler storageFromeHandler;
+
+ protected int processingRate = -1;
+ protected int frameCount = 0;
+ private long avgDelayPersistence = 0;
+ private boolean active;
+ private Map<Integer, Long> tupleTimeStats;
+ IFramePostProcessor postProcessor = null;
+ IFramePreprocessor preProcessor = null;
+
+ public static MonitoredBuffer getMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler,
+ IFrameWriter frameWriter, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+ IFeedMetricCollector metricCollector, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ IExceptionHandler exceptionHandler, IFrameEventCallback callback, int nPartitions,
+ FeedPolicyAccessor policyAccessor) {
+ switch (runtimeId.getFeedRuntimeType()) {
+ case COMPUTE:
+ return new ComputeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+ connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+ case STORE:
+ return new StorageSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+ connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+ case COLLECT:
+ return new IntakeSideMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+ connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+ default:
+ return new BasicMonitoredBuffer(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector,
+ connectionId, runtimeId, exceptionHandler, callback, nPartitions, policyAccessor);
+ }
+ }
+
+ protected MonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+ FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+ FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+ IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+ this.ctx = ctx;
+ this.connectionId = connectionId;
+ this.frameWriter = frameWriter;
+ this.inflowFta = new FrameTupleAccessor(recordDesc);
+ this.outflowFta = new FrameTupleAccessor(recordDesc);
+ this.runtimeId = runtimeId;
+ this.metricCollector = metricCollector;
+ this.exceptionHandler = exceptionHandler;
+ this.callback = callback;
+ this.inputHandler = inputHandler;
+ this.timer = new Timer();
+ this.recordDesc = recordDesc;
+ this.policyAccessor = policyAccessor;
+ this.nPartitions = nPartitions;
+ this.active = true;
+ initializeMonitoring();
+ }
+
+ protected abstract boolean monitorProcessingRate();
+
+ protected abstract boolean logInflowOutflowRate();
+
+ protected abstract boolean reportOutflowRate();
+
+ protected abstract boolean reportInflowRate();
+
+ protected abstract boolean monitorInputQueueLength();
+
+ protected abstract IFramePreprocessor getFramePreProcessor();
+
+ protected abstract IFramePostProcessor getFramePostProcessor();
+
+ protected void initializeMonitoring() {
+ monitorProcessingRate = monitorProcessingRate();
+ monitorInputQueueLength = monitorInputQueueLength();
+ reportInflowRate = reportInflowRate();
+ reportOutflowRate = reportOutflowRate();
+ logInflowOutflowRate = policyAccessor.isLoggingStatisticsEnabled() || logInflowOutflowRate();
+
+ if (monitorProcessingRate && policyAccessor.isElastic()) { // check possibility to scale in
+ this.processingRateTask = new MonitoreProcessRateTimerTask(this, inputHandler.getFeedManager(),
+ connectionId, nPartitions);
+ this.timer.scheduleAtFixedRate(processingRateTask, 0, PROCESSING_RATE_MEASURE_FREQUENCY);
+ }
+
+ if (monitorInputQueueLength
+ && (policyAccessor.isElastic() || policyAccessor.throttlingEnabled()
+ || policyAccessor.spillToDiskOnCongestion() || policyAccessor.discardOnCongestion())) {
+ this.monitorInputQueueLengthTask = new MonitorInputQueueLengthTimerTask(this, callback);
+ this.timer.scheduleAtFixedRate(monitorInputQueueLengthTask, 0, INPUT_QUEUE_MEASURE_FREQUENCY);
+ }
+
+ if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
+ this.logInflowOutflowRateTask = new LogInputOutputRateTask(this, logInflowOutflowRate, reportInflowRate,
+ reportOutflowRate);
+ this.timer.scheduleAtFixedRate(logInflowOutflowRateTask, 0, LOG_INPUT_OUTPUT_RATE_FREQUENCY);
+ this.inflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
+ ValueType.INFLOW_RATE, MetricType.RATE);
+ this.outflowReportSenderId = metricCollector.createReportSender(connectionId, runtimeId,
+ ValueType.OUTFLOW_RATE, MetricType.RATE);
+ }
+ }
+
+ protected void deinitializeMonitoring() {
+ if (monitorInputQueueLengthTask != null) {
+ monitorInputQueueLengthTask.cancel();
+ }
+ if (processingRateTask != null) {
+ processingRateTask.cancel();
+ }
+ if (logInflowOutflowRate || reportInflowRate || reportOutflowRate) {
+ metricCollector.removeReportSender(inflowReportSenderId);
+ metricCollector.removeReportSender(outflowReportSenderId);
+ logInflowOutflowRateTask.cancel();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Disabled monitoring for " + this.runtimeId);
+ }
+ }
+
+ protected void postProcessFrame(long startTime, ByteBuffer frame) throws Exception {
+ if (monitorProcessingRate) {
+ frameCount++;
+ if (frameCount % PROCESS_RATE_REFRESH == 0) {
+ long endTime = System.currentTimeMillis();
+ processingRate = (int) ((double) outflowFta.getTupleCount() * 1000 / (endTime - startTime));
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Processing Rate :" + processingRate + " tuples/sec");
+ }
+ frameCount = 0;
+ }
+ }
+
+ if (logInflowOutflowRate || reportOutflowRate) {
+ metricCollector.sendReport(outflowReportSenderId, outflowFta.getTupleCount());
+ }
+
+ postProcessFrame(frame);
+
+ }
+
+ protected void preProcessFrame(ByteBuffer frame) throws Exception {
+ if (postProcessor == null) {
+ preProcessor = getFramePreProcessor();
+ }
+ if (preProcessor != null) {
+ preProcessor.preProcess(frame);
+ }
+ }
+
+ protected void postProcessFrame(ByteBuffer frame) throws Exception {
+ if (postProcessor == null) {
+ postProcessor = getFramePostProcessor();
+ }
+ if (postProcessor != null) {
+ outflowFta.reset(frame);
+ postProcessor.postProcessFrame(frame, outflowFta);
+ }
+ }
+
+ @Override
+ public void sendMessage(DataBucket message) {
+ inbox.add(message);
+ }
+
+ public void sendReport(ByteBuffer frame) {
+ if ((logInflowOutflowRate || reportInflowRate)
+ && !(inputHandler.getMode().equals(Mode.PROCESS_BACKLOG) || inputHandler.getMode().equals(
+ Mode.PROCESS_SPILL))) {
+ inflowFta.reset(frame);
+ metricCollector.sendReport(inflowReportSenderId, inflowFta.getTupleCount());
+ }
+ }
+
+ /** return rate in terms of tuples/sec **/
+ public int getInflowRate() {
+ return metricCollector.getMetric(inflowReportSenderId);
+ }
+
+ /** return rate in terms of tuples/sec **/
+ public int getOutflowRate() {
+ return metricCollector.getMetric(outflowReportSenderId);
+ }
+
+ /** return the number of pending frames from the input queue **/
+ public int getWorkSize() {
+ return inbox.size();
+ }
+
+ /** reset the number of partitions (cardinality) for the runtime **/
+ public void setNumberOfPartitions(int nPartitions) {
+ if (processingRateTask != null) {
+ int currentPartitions = ((MonitoreProcessRateTimerTask) processingRateTask).getNumberOfPartitions();
+ if (currentPartitions != nPartitions) {
+ ((MonitoreProcessRateTimerTask) processingRateTask).setNumberOfPartitions(nPartitions);
+ }
+ }
+ }
+
+ public FeedRuntimeInputHandler getInputHandler() {
+ return inputHandler;
+ }
+
+ public synchronized void close(boolean processPending, boolean disableMonitoring) {
+ super.close(processPending);
+ if (disableMonitoring) {
+ deinitializeMonitoring();
+ }
+ active = false;
+ }
+
+ @Override
+ public synchronized void processMessage(DataBucket message) throws Exception {
+ if (!active) {
+ message.doneReading();
+ return;
+ }
+ switch (message.getContentType()) {
+ case DATA:
+ boolean finishedProcessing = false;
+ ByteBuffer frameReceived = message.getContent();
+ ByteBuffer frameToProcess = null;
+ if (inputHandler.isThrottlingEnabled()) {
+ inflowFta.reset(frameReceived);
+ int pRate = getProcessingRate();
+ int inflowRate = getInflowRate();
+ if (inflowRate > pRate) {
+ double retainFraction = (pRate * 0.8 / inflowRate);
+ frameToProcess = throttleFrame(inflowFta, retainFraction);
+ inflowFta.reset(frameToProcess);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Throttling at fraction " + retainFraction + "inflow rate " + inflowRate
+ + " no of tuples remaining " + inflowFta.getTupleCount());
+
+ }
+ } else {
+ frameToProcess = frameReceived;
+ }
+ } else {
+ frameToProcess = frameReceived;
+ }
+ outflowFta.reset(frameToProcess);
+ long startTime = 0;
+ while (!finishedProcessing) {
+ try {
+ inflowFta.reset(frameToProcess);
+ startTime = System.currentTimeMillis();
+ preProcessFrame(frameToProcess);
+ frameWriter.nextFrame(frameToProcess);
+ postProcessFrame(startTime, frameToProcess);
+ finishedProcessing = true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ frameToProcess = exceptionHandler.handleException(e, frameToProcess);
+ finishedProcessing = true;
+ }
+ }
+ message.doneReading();
+ break;
+ case EOD:
+ message.doneReading();
+ timer.cancel();
+ callback.frameEvent(FrameEvent.FINISHED_PROCESSING);
+ break;
+ case EOSD:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Done processing spillage");
+ }
+ message.doneReading();
+ callback.frameEvent(FrameEvent.FINISHED_PROCESSING_SPILLAGE);
+ break;
+
+ }
+ }
+
+ private ByteBuffer throttleFrame(FrameTupleAccessor fta, double retainFraction) throws HyracksDataException {
+ int desiredTuples = (int) (fta.getTupleCount() * retainFraction);
+ return FeedFrameUtil.getSampledFrame(ctx, fta, desiredTuples);
+ }
+
+ public Mode getMode() {
+ return inputHandler.getMode();
+ }
+
+ public FeedRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
+ public void setFrameWriter(IFrameWriter frameWriter) {
+ this.frameWriter = frameWriter;
+ }
+
+ public void reset() {
+ active = true;
+ if (logInflowOutflowRate) {
+ metricCollector.resetReportSender(inflowReportSenderId);
+ metricCollector.resetReportSender(outflowReportSenderId);
+ }
+ }
+
+ public int getProcessingRate() {
+ return processingRate;
+ }
+
+ public Map<Integer, Long> getTupleTimeStats() {
+ return tupleTimeStats;
+ }
+
+ public long getAvgDelayRecordPersistence() {
+ return avgDelayPersistence;
+ }
+
+ public MonitoredBufferStorageTimerTask getStorageTimeTrackingRateTask() {
+ return storageTimeTrackingRateTask;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBufferTimerTasks.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBufferTimerTasks.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBufferTimerTasks.java
new file mode 100644
index 0000000..13c979f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/MonitoredBufferTimerTasks.java
@@ -0,0 +1,290 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessageService;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector.ValueType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback.FrameEvent;
+import edu.uci.ics.asterix.common.feeds.message.FeedReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ScaleInReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.StorageReportFeedMessage;
+
+public class MonitoredBufferTimerTasks {
+
+ private static final Logger LOGGER = Logger.getLogger(MonitorInputQueueLengthTimerTask.class.getName());
+
+ public static class MonitoredBufferStorageTimerTask extends TimerTask {
+
+ private static final int PERSISTENCE_DELAY_VIOLATION_MAX = 5;
+
+ private final StorageSideMonitoredBuffer mBuffer;
+ private final IFeedManager feedManager;
+ private final int partition;
+ private final FeedConnectionId connectionId;
+ private final FeedPolicyAccessor policyAccessor;
+ private final StorageFrameHandler storageFromeHandler;
+ private final StorageReportFeedMessage storageReportMessage;
+ private final FeedTupleCommitAckMessage tupleCommitAckMessage;
+
+ private Map<Integer, Integer> maxIntakeBaseCovered;
+ private int countDelayExceeded = 0;
+
+ public MonitoredBufferStorageTimerTask(StorageSideMonitoredBuffer mBuffer, IFeedManager feedManager,
+ FeedConnectionId connectionId, int partition, FeedPolicyAccessor policyAccessor,
+ StorageFrameHandler storageFromeHandler) {
+ this.mBuffer = mBuffer;
+ this.feedManager = feedManager;
+ this.connectionId = connectionId;
+ this.partition = partition;
+ this.policyAccessor = policyAccessor;
+ this.storageFromeHandler = storageFromeHandler;
+ this.storageReportMessage = new StorageReportFeedMessage(this.connectionId, this.partition, 0, false, 0, 0);
+ this.tupleCommitAckMessage = new FeedTupleCommitAckMessage(this.connectionId, 0, 0, null);
+ this.maxIntakeBaseCovered = new HashMap<Integer, Integer>();
+ }
+
+ @Override
+ public void run() {
+ if (mBuffer.isAckingEnabled() && !mBuffer.getInputHandler().isThrottlingEnabled()) {
+ ackRecords();
+ }
+ if (mBuffer.isTimeTrackingEnabled()) {
+ checkLatencyViolation();
+ }
+ }
+
+ private void ackRecords() {
+ Set<Integer> partitions = storageFromeHandler.getPartitionsWithStats();
+ List<Integer> basesCovered = new ArrayList<Integer>();
+ for (int intakePartition : partitions) {
+ Map<Integer, IntakePartitionStatistics> baseAcks = storageFromeHandler
+ .getBaseAcksForPartition(intakePartition);
+ for (Entry<Integer, IntakePartitionStatistics> entry : baseAcks.entrySet()) {
+ int base = entry.getKey();
+ IntakePartitionStatistics stats = entry.getValue();
+ Integer maxIntakeBaseForPartition = maxIntakeBaseCovered.get(intakePartition);
+ if (maxIntakeBaseForPartition == null || maxIntakeBaseForPartition < base) {
+ tupleCommitAckMessage.reset(intakePartition, base, stats.getAckInfo());
+ feedManager.getFeedMessageService().sendMessage(tupleCommitAckMessage);
+ } else {
+ basesCovered.add(base);
+ }
+ }
+ for (Integer b : basesCovered) {
+ baseAcks.remove(b);
+ }
+ basesCovered.clear();
+ }
+ }
+
+ private void checkLatencyViolation() {
+ long avgDelayPersistence = storageFromeHandler.getAvgDelayPersistence();
+ if (avgDelayPersistence > policyAccessor.getMaxDelayRecordPersistence()) {
+ countDelayExceeded++;
+ if (countDelayExceeded > PERSISTENCE_DELAY_VIOLATION_MAX) {
+ storageReportMessage.reset(0, false, mBuffer.getAvgDelayRecordPersistence());
+ feedManager.getFeedMessageService().sendMessage(storageReportMessage);
+ }
+ } else {
+ countDelayExceeded = 0;
+ }
+ }
+
+ public void receiveCommitAckResponse(FeedTupleCommitResponseMessage message) {
+ maxIntakeBaseCovered.put(message.getIntakePartition(), message.getMaxWindowAcked());
+ }
+ }
+
+ public static class LogInputOutputRateTask extends TimerTask {
+
+ private final MonitoredBuffer mBuffer;
+ private final boolean log;
+ private final boolean reportInflow;
+ private final boolean reportOutflow;
+
+ private final IFeedMessageService messageService;
+ private final FeedReportMessage message;
+
+ public LogInputOutputRateTask(MonitoredBuffer mBuffer, boolean log, boolean reportInflow, boolean reportOutflow) {
+ this.mBuffer = mBuffer;
+ this.log = log;
+ this.reportInflow = reportInflow;
+ this.reportOutflow = reportOutflow;
+ if (reportInflow || reportOutflow) {
+ ValueType vType = reportInflow ? ValueType.INFLOW_RATE : ValueType.OUTFLOW_RATE;
+ messageService = mBuffer.getInputHandler().getFeedManager().getFeedMessageService();
+ message = new FeedReportMessage(mBuffer.getInputHandler().getConnectionId(), mBuffer.getRuntimeId(),
+ vType, 0);
+ } else {
+ messageService = null;
+ message = null;
+ }
+
+ }
+
+ @Override
+ public void run() {
+ int pendingWork = mBuffer.getWorkSize();
+ int outflowRate = mBuffer.getOutflowRate();
+ int inflowRate = mBuffer.getInflowRate();
+ if (log) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(mBuffer.getRuntimeId() + " " + "Inflow rate:" + inflowRate + " Outflow Rate:"
+ + outflowRate + " Pending Work " + pendingWork);
+ }
+ }
+ if (reportInflow) {
+ message.reset(inflowRate);
+ } else if (reportOutflow) {
+ message.reset(outflowRate);
+ }
+ messageService.sendMessage(message);
+ }
+ }
+
+ public static class MonitorInputQueueLengthTimerTask extends TimerTask {
+
+ private final MonitoredBuffer mBuffer;
+ private final IFrameEventCallback callback;
+ private final int pendingWorkThreshold;
+ private final int maxSuccessiveThresholdPeriods;
+ private FrameEvent lastEvent = FrameEvent.NO_OP;
+ private int pendingWorkExceedCount = 0;
+
+ public MonitorInputQueueLengthTimerTask(MonitoredBuffer mBuffer, IFrameEventCallback callback) {
+ this.mBuffer = mBuffer;
+ this.callback = callback;
+ AsterixFeedProperties props = mBuffer.getInputHandler().getFeedManager().getAsterixFeedProperties();
+ pendingWorkThreshold = props.getPendingWorkThreshold();
+ maxSuccessiveThresholdPeriods = props.getMaxSuccessiveThresholdPeriod();
+ }
+
+ @Override
+ public void run() {
+ int pendingWork = mBuffer.getWorkSize();
+ if (mBuffer.getMode().equals(Mode.PROCESS_SPILL) || mBuffer.getMode().equals(Mode.PROCESS_BACKLOG)) {
+ return;
+ }
+
+ switch (lastEvent) {
+ case NO_OP:
+ case PENDING_WORK_DONE:
+ case FINISHED_PROCESSING_SPILLAGE:
+ if (pendingWork > pendingWorkThreshold) {
+ pendingWorkExceedCount++;
+ if (pendingWorkExceedCount > maxSuccessiveThresholdPeriods) {
+ pendingWorkExceedCount = 0;
+ lastEvent = FrameEvent.PENDING_WORK_THRESHOLD_REACHED;
+ callback.frameEvent(lastEvent);
+ }
+ } else if (pendingWork == 0 && mBuffer.getMode().equals(Mode.SPILL)) {
+ lastEvent = FrameEvent.PENDING_WORK_DONE;
+ callback.frameEvent(lastEvent);
+ }
+ break;
+ case PENDING_WORK_THRESHOLD_REACHED:
+ if (((pendingWork * 1.0) / pendingWorkThreshold) <= 0.5) {
+ lastEvent = FrameEvent.PENDING_WORK_DONE;
+ callback.frameEvent(lastEvent);
+ }
+ break;
+ case FINISHED_PROCESSING:
+ break;
+
+ }
+ }
+ }
+
+ /**
+ * A timer task to measure and compare the processing rate and inflow rate
+ * to look for possibility to scale-in, that is reduce the degree of cardinality
+ * of the compute operator.
+ */
+ public static class MonitoreProcessRateTimerTask extends TimerTask {
+
+ private final MonitoredBuffer mBuffer;
+ private final IFeedManager feedManager;
+ private int nPartitions;
+ private ScaleInReportMessage sMessage;
+ private boolean proposedChange;
+
+ public MonitoreProcessRateTimerTask(MonitoredBuffer mBuffer, IFeedManager feedManager,
+ FeedConnectionId connectionId, int nPartitions) {
+ this.mBuffer = mBuffer;
+ this.feedManager = feedManager;
+ this.nPartitions = nPartitions;
+ this.sMessage = new ScaleInReportMessage(connectionId, FeedRuntimeType.COMPUTE, 0, 0);
+ this.proposedChange = false;
+ }
+
+ public int getNumberOfPartitions() {
+ return nPartitions;
+ }
+
+ public void setNumberOfPartitions(int nPartitions) {
+ this.nPartitions = nPartitions;
+ proposedChange = false;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Reset the number of partitions for " + mBuffer.getRuntimeId() + " to " + nPartitions);
+ }
+ }
+
+ @Override
+ public void run() {
+ if (!proposedChange) {
+ int inflowRate = mBuffer.getInflowRate();
+ int procRate = mBuffer.getProcessingRate();
+ if (inflowRate > 0 && procRate > 0) {
+ if (inflowRate < procRate) {
+ int possibleCardinality = (int) Math.ceil(nPartitions * inflowRate / (double) procRate);
+ if (possibleCardinality < nPartitions
+ && ((((nPartitions - possibleCardinality) * 1.0) / nPartitions) >= 0.25)) {
+ sMessage.reset(nPartitions, possibleCardinality);
+ feedManager.getFeedMessageService().sendMessage(sMessage);
+ proposedChange = true;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Proposed scale-in " + sMessage);
+ }
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Inflow Rate (" + inflowRate + ") exceeds Processing Rate" + " (" + procRate
+ + ")");
+ }
+ }
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Waiting for earlier proposal to scale in to be applied");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoad.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoad.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoad.java
new file mode 100644
index 0000000..66e5a40
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoad.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+public class NodeLoad implements Comparable<NodeLoad> {
+
+ private final String nodeId;
+
+ private int nRuntimes;
+
+ public NodeLoad(String nodeId) {
+ this.nodeId = nodeId;
+ this.nRuntimes = 0;
+ }
+
+ public void addLoad() {
+ nRuntimes++;
+ }
+
+ public void removeLoad(FeedRuntimeType runtimeType) {
+ nRuntimes--;
+ }
+
+ @Override
+ public int compareTo(NodeLoad o) {
+ if (this == o) {
+ return 0;
+ }
+ return nRuntimes - o.getnRuntimes();
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public int getnRuntimes() {
+ return nRuntimes;
+ }
+
+ public void setnRuntimes(int nRuntimes) {
+ this.nRuntimes = nRuntimes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReport.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReport.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReport.java
new file mode 100644
index 0000000..8257143
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/NodeLoadReport.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class NodeLoadReport implements Comparable<NodeLoadReport> {
+
+ private final String nodeId;
+ private float cpuLoad;
+ private double usedHeap;
+ private int nRuntimes;
+
+ public NodeLoadReport(String nodeId, float cpuLoad, float usedHeap, int nRuntimes) {
+ this.nodeId = nodeId;
+ this.cpuLoad = cpuLoad;
+ this.usedHeap = usedHeap;
+ this.nRuntimes = nRuntimes;
+ }
+
+ public static NodeLoadReport read(JSONObject obj) throws JSONException {
+ NodeLoadReport r = new NodeLoadReport(obj.getString(FeedConstants.MessageConstants.NODE_ID),
+ (float) obj.getDouble(FeedConstants.MessageConstants.CPU_LOAD),
+ (float) obj.getDouble(FeedConstants.MessageConstants.HEAP_USAGE),
+ obj.getInt(FeedConstants.MessageConstants.N_RUNTIMES));
+ return r;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof NodeLoadReport)) {
+ return false;
+ }
+ return ((NodeLoadReport) o).nodeId.equals(nodeId);
+ }
+
+ @Override
+ public int hashCode() {
+ return nodeId.hashCode();
+ }
+
+ @Override
+ public int compareTo(NodeLoadReport o) {
+ if (nRuntimes != o.getnRuntimes()) {
+ return nRuntimes - o.getnRuntimes();
+ } else {
+ return (int) (this.cpuLoad - ((NodeLoadReport) o).cpuLoad);
+ }
+ }
+
+ public float getCpuLoad() {
+ return cpuLoad;
+ }
+
+ public void setCpuLoad(float cpuLoad) {
+ this.cpuLoad = cpuLoad;
+ }
+
+ public double getUsedHeap() {
+ return usedHeap;
+ }
+
+ public void setUsedHeap(double usedHeap) {
+ this.usedHeap = usedHeap;
+ }
+
+ public int getnRuntimes() {
+ return nRuntimes;
+ }
+
+ public void setnRuntimes(int nRuntimes) {
+ this.nRuntimes = nRuntimes;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+}