You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:12 UTC
[15/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameTupleAccessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameTupleAccessor.java
new file mode 100644
index 0000000..9567307
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameTupleAccessor.java
@@ -0,0 +1,92 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.common.feeds.FeedConstants.StatisticsConstants;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedFrameTupleAccessor implements IFrameTupleAccessor {
+
+ private final FrameTupleAccessor frameAccessor;
+ private final int numOpenFields;
+
+ public FeedFrameTupleAccessor(FrameTupleAccessor frameAccessor) {
+ this.frameAccessor = frameAccessor;
+ int firstRecordStart = frameAccessor.getTupleStartOffset(0) + frameAccessor.getFieldSlotsLength();
+ int openPartOffsetOrig = frameAccessor.getBuffer().getInt(firstRecordStart + 6);
+ numOpenFields = frameAccessor.getBuffer().getInt(firstRecordStart + openPartOffsetOrig);
+ }
+
+ public int getFeedIntakePartition(int tupleIndex) {
+ ByteBuffer buffer = frameAccessor.getBuffer();
+ int recordStart = frameAccessor.getTupleStartOffset(tupleIndex) + frameAccessor.getFieldSlotsLength();
+ int openPartOffsetOrig = buffer.getInt(recordStart + 6);
+ int partitionOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
+ + StatisticsConstants.INTAKE_PARTITION.length() + 2 + 1;
+ return buffer.getInt(recordStart + partitionOffset);
+ }
+
+
+
+ @Override
+ public int getFieldCount() {
+ return frameAccessor.getFieldCount();
+ }
+
+ @Override
+ public int getFieldSlotsLength() {
+ return frameAccessor.getFieldSlotsLength();
+ }
+
+ @Override
+ public int getFieldEndOffset(int tupleIndex, int fIdx) {
+ return frameAccessor.getFieldEndOffset(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getFieldStartOffset(int tupleIndex, int fIdx) {
+ return frameAccessor.getFieldStartOffset(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getFieldLength(int tupleIndex, int fIdx) {
+ return frameAccessor.getFieldLength(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getTupleEndOffset(int tupleIndex) {
+ return frameAccessor.getTupleEndOffset(tupleIndex);
+ }
+
+ @Override
+ public int getTupleStartOffset(int tupleIndex) {
+ return frameAccessor.getTupleStartOffset(tupleIndex);
+ }
+
+ @Override
+ public int getTupleCount() {
+ return frameAccessor.getTupleCount();
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return frameAccessor.getBuffer();
+ }
+
+ @Override
+ public void reset(ByteBuffer buffer) {
+ frameAccessor.reset(buffer);
+ }
+
+ @Override
+ public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+ return frameAccessor.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getTupleLength(int tupleIndex) {
+ return frameAccessor.getTupleLength(tupleIndex);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameUtil.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameUtil.java
new file mode 100644
index 0000000..927d10d
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameUtil.java
@@ -0,0 +1,84 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class FeedFrameUtil {
+
+ public static ByteBuffer getSlicedFrame(IHyracksTaskContext ctx, int tupleIndex, FrameTupleAccessor fta) throws HyracksDataException {
+ FrameTupleAppender appender = new FrameTupleAppender();
+ IFrame slicedFrame = new VSizeFrame(ctx);
+ appender.reset(slicedFrame, true);
+ int startTupleIndex = tupleIndex + 1;
+ int totalTuples = fta.getTupleCount();
+ for (int ti = startTupleIndex; ti < totalTuples; ti++) {
+ appender.append(fta, ti);
+ }
+ return slicedFrame.getBuffer();
+ }
+
+ public static ByteBuffer getSampledFrame(IHyracksTaskContext ctx, FrameTupleAccessor fta, int sampleSize) throws HyracksDataException {
+ NChooseKIterator it = new NChooseKIterator(fta.getTupleCount(), sampleSize);
+ FrameTupleAppender appender = new FrameTupleAppender();
+ IFrame sampledFrame = new VSizeFrame(ctx);
+ appender.reset(sampledFrame, true);
+ int nextTupleIndex = 0;
+ while (it.hasNext()) {
+ nextTupleIndex = it.next();
+ appender.append(fta, nextTupleIndex);
+ }
+ return sampledFrame.getBuffer();
+ }
+
+
+
+ private static class NChooseKIterator {
+
+ private final int n;
+ private final int k;
+ private final BitSet bSet;
+ private final Random random;
+
+ private int traversed = 0;
+
+ public NChooseKIterator(int n, int k) {
+ this.n = n;
+ this.k = k;
+ this.bSet = new BitSet(n);
+ bSet.set(0, n - 1);
+ this.random = new Random();
+ }
+
+ public boolean hasNext() {
+ return traversed < k;
+ }
+
+ public int next() {
+ if (hasNext()) {
+ traversed++;
+ int startOffset = random.nextInt(n);
+ int pos = -1;
+ while (pos < 0) {
+ pos = bSet.nextSetBit(startOffset);
+ if (pos < 0) {
+ startOffset = 0;
+ }
+ }
+ bSet.clear(pos);
+ return pos;
+ } else {
+ return -1;
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedId.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedId.java
new file mode 100644
index 0000000..1e38f70
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedId.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.Serializable;
+
+/**
+ * A unique identifier for a data feed.
+ */
+public class FeedId implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String dataverse;
+ private final String feedName;
+
+ public FeedId(String dataverse, String feedName) {
+ this.dataverse = dataverse;
+ this.feedName = feedName;
+ }
+
+ public String getDataverse() {
+ return dataverse;
+ }
+
+ public String getFeedName() {
+ return feedName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof FeedId)) {
+ return false;
+ }
+ if (this == o || ((FeedId) o).getFeedName().equals(feedName) && ((FeedId) o).getDataverse().equals(dataverse)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return dataverse + "." + feedName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedIntakeInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedIntakeInfo.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedIntakeInfo.java
new file mode 100644
index 0000000..8ed3cf1
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedIntakeInfo.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedIntakeInfo extends FeedJobInfo {
+
+ private final FeedId feedId;
+ private final IFeedJoint intakeFeedJoint;
+ private final JobSpecification spec;
+ private List<String> intakeLocation;
+
+ public FeedIntakeInfo(JobId jobId, FeedJobState state, JobType jobType, FeedId feedId, IFeedJoint intakeFeedJoint,
+ JobSpecification spec) {
+ super(jobId, state, FeedJobInfo.JobType.INTAKE, spec);
+ this.feedId = feedId;
+ this.intakeFeedJoint = intakeFeedJoint;
+ this.spec = spec;
+ }
+
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+ public IFeedJoint getIntakeFeedJoint() {
+ return intakeFeedJoint;
+ }
+
+ public JobSpecification getSpec() {
+ return spec;
+ }
+
+ public List<String> getIntakeLocation() {
+ return intakeLocation;
+ }
+
+ public void setIntakeLocation(List<String> intakeLocation) {
+ this.intakeLocation = intakeLocation;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJobInfo.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJobInfo.java
new file mode 100644
index 0000000..c50ca43
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJobInfo.java
@@ -0,0 +1,68 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedJobInfo {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedJobInfo.class.getName());
+
+ public enum JobType {
+ INTAKE,
+ FEED_CONNECT
+ }
+
+ public enum FeedJobState {
+ CREATED,
+ ACTIVE,
+ UNDER_RECOVERY,
+ ENDED
+ }
+
+ protected final JobId jobId;
+ protected final JobType jobType;
+ protected FeedJobState state;
+ protected JobSpecification spec;
+
+ public FeedJobInfo(JobId jobId, FeedJobState state, JobType jobType, JobSpecification spec) {
+ this.jobId = jobId;
+ this.state = state;
+ this.jobType = jobType;
+ this.spec = spec;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public FeedJobState getState() {
+ return state;
+ }
+
+ public void setState(FeedJobState state) {
+ this.state = state;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(this + " is in " + state + " state.");
+ }
+ }
+
+ public JobType getJobType() {
+ return jobType;
+ }
+
+ public JobSpecification getSpec() {
+ return spec;
+ }
+
+ public void setSpec(JobSpecification spec) {
+ this.spec = spec;
+ }
+
+ public String toString() {
+ return jobId + " [" + jobType + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJointKey.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJointKey.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJointKey.java
new file mode 100644
index 0000000..145ac32
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJointKey.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Represents a unique identifier for a Feed Joint. A Feed joint is a logical entity located
+ * along a feed ingestion pipeline at a point where the tuples moving as part of data flow
+ * constitute the feed. The feed joint acts as a network tap and allows the flowing data to be
+ * routed to multiple paths.
+ */
+public class FeedJointKey {
+
+ private final FeedId primaryFeedId;
+ private final List<String> appliedFunctions;
+ private final String stringRep;
+
+ public FeedJointKey(FeedId feedId, List<String> appliedFunctions) {
+ this.primaryFeedId = feedId;
+ this.appliedFunctions = appliedFunctions;
+ StringBuilder builder = new StringBuilder();
+ builder.append(feedId);
+ builder.append(":");
+ builder.append(StringUtils.join(appliedFunctions, ':'));
+ stringRep = builder.toString();
+ }
+
+ public FeedId getFeedId() {
+ return primaryFeedId;
+ }
+
+ public List<String> getAppliedFunctions() {
+ return appliedFunctions;
+ }
+
+ public String getStringRep() {
+ return stringRep;
+ }
+
+ @Override
+ public final String toString() {
+ return stringRep;
+ }
+
+ @Override
+ public int hashCode() {
+ return stringRep.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || !(o instanceof FeedJointKey)) {
+ return false;
+ }
+ return stringRep.equals(((FeedJointKey) o).stringRep);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMemoryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMemoryManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMemoryManager.java
new file mode 100644
index 0000000..88e900d
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMemoryManager.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent.Type;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+
+public class FeedMemoryManager implements IFeedMemoryManager {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMemoryManager.class.getName());
+ private static final int ALLOCATION_INCREMENT = 10;
+
+ private final AtomicInteger componentId = new AtomicInteger(0);
+ private final String nodeId;
+ private final int budget;
+ private final int frameSize;
+
+ private int committed;
+
+ public FeedMemoryManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize) {
+ this.nodeId = nodeId;
+ this.frameSize = frameSize;
+ budget = (int) feedProperties.getMemoryComponentGlobalBudget() / frameSize;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Feed Memory budget " + budget + " frames (frame size=" + frameSize + ")");
+ }
+ }
+
+ @Override
+ public synchronized IFeedMemoryComponent getMemoryComponent(Type type) {
+ IFeedMemoryComponent memoryComponent = null;
+ boolean valid = false;
+ switch (type) {
+ case COLLECTION:
+ valid = committed + START_COLLECTION_SIZE <= budget;
+ if (valid) {
+ memoryComponent = new FrameCollection(componentId.incrementAndGet(), this, START_COLLECTION_SIZE);
+ }
+ break;
+ case POOL:
+ valid = committed + START_POOL_SIZE <= budget;
+ if (valid) {
+ memoryComponent = new DataBucketPool(componentId.incrementAndGet(), this, START_POOL_SIZE,
+ frameSize);
+ }
+ committed += START_POOL_SIZE;
+ break;
+ }
+ if (!valid) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to allocate memory component of type" + type);
+ }
+ }
+ return valid ? memoryComponent : null;
+ }
+
+ @Override
+ public synchronized boolean expandMemoryComponent(IFeedMemoryComponent memoryComponent) {
+ if (committed + ALLOCATION_INCREMENT > budget) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Memory budget " + budget + " is exhausted. Space left: " + (budget - committed)
+ + " frames.");
+ }
+ return false;
+ } else {
+ memoryComponent.expand(ALLOCATION_INCREMENT);
+ committed += ALLOCATION_INCREMENT;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Expanded memory component " + memoryComponent + " by " + ALLOCATION_INCREMENT + " " + this);
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public synchronized void releaseMemoryComponent(IFeedMemoryComponent memoryComponent) {
+ int delta = memoryComponent.getTotalAllocation();
+ committed -= delta;
+ memoryComponent.reset();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Reset " + memoryComponent + " and reclaimed " + delta + " frames " + this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "FeedMemoryManager [" + nodeId + "]" + "(" + committed + "/" + budget + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMessageService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMessageService.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMessageService.java
index 3d0d8f9..320e6b9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMessageService.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMessageService.java
@@ -1,149 +1,140 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.common.feeds;
-import java.io.IOException;
-import java.io.InputStream;
import java.net.Socket;
-import java.net.UnknownHostException;
-import java.nio.CharBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessageService;
+
/**
* Sends feed report messages on behalf of an operator instance
* to the SuperFeedManager associated with the feed.
*/
-public class FeedMessageService {
+public class FeedMessageService implements IFeedMessageService {
private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
- public static final char MessageSeparator = '|';
- private static final char EOL = (char) "\n".getBytes()[0];
-
- private final FeedConnectionId feedId;
private final LinkedBlockingQueue<String> inbox;
private final FeedMessageHandler mesgHandler;
- private final IFeedManager feedManager;
-
- public FeedMessageService(FeedConnectionId feedId, IFeedManager feedManager) {
- this.feedId = feedId;
- inbox = new LinkedBlockingQueue<String>();
- mesgHandler = new FeedMessageHandler(inbox, feedId, feedManager);
- this.feedManager = feedManager;
+ private final String nodeId;
+ private ExecutorService executor;
+
+ public FeedMessageService(AsterixFeedProperties feedProperties, String nodeId, String ccClusterIp) {
+ this.inbox = new LinkedBlockingQueue<String>();
+ this.mesgHandler = new FeedMessageHandler(inbox, ccClusterIp, feedProperties.getFeedCentralManagerPort());
+ this.nodeId = nodeId;
+ this.executor = Executors.newSingleThreadExecutor();
}
- public void start() throws UnknownHostException, IOException, Exception {
- feedManager.getFeedExecutorService(feedId).execute(mesgHandler);
+ public void start() throws Exception {
+
+ executor.execute(mesgHandler);
}
- public void stop() throws IOException {
+ public void stop() {
+ synchronized (mesgHandler.getLock()) {
+ executor.shutdownNow();
+ }
mesgHandler.stop();
}
- public void sendMessage(String message) throws IOException {
- inbox.add(message);
+ @Override
+ public void sendMessage(IFeedMessage message) {
+ try {
+ JSONObject obj = message.toJSON();
+ obj.put(FeedConstants.MessageConstants.NODE_ID, nodeId);
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, message.getMessageType().name());
+ inbox.add(obj.toString());
+ } catch (JSONException jse) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("JSON exception in parsing message " + message + " exception [" + jse.getMessage() + "]");
+ }
+ }
}
private static class FeedMessageHandler implements Runnable {
private final LinkedBlockingQueue<String> inbox;
- private final FeedConnectionId feedId;
- private Socket sfmSocket;
- private boolean process = true;
- private final IFeedManager feedManager;
+ private final String host;
+ private final int port;
+ private final Object lock;
- public FeedMessageHandler(LinkedBlockingQueue<String> inbox, FeedConnectionId feedId, IFeedManager feedManager) {
+ private Socket cfmSocket;
+
+ private static final byte[] EOL = "\n".getBytes();
+
+ public FeedMessageHandler(LinkedBlockingQueue<String> inbox, String host, int port) {
this.inbox = inbox;
- this.feedId = feedId;
- this.feedManager = feedManager;
+ this.host = host;
+ this.port = port;
+ this.lock = new Object();
}
public void run() {
try {
- sfmSocket = obtainSFMSocket();
- if (sfmSocket != null) {
- while (process) {
+ cfmSocket = new Socket(host, port);
+ if (cfmSocket != null) {
+ while (true) {
String message = inbox.take();
- sfmSocket.getOutputStream().write(message.getBytes());
+ synchronized (lock) { // lock prevents message handler from sending incomplete message midst shutdown attempt
+ cfmSocket.getOutputStream().write(message.getBytes());
+ cfmSocket.getOutputStream().write(EOL);
+ }
}
} else {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to start feed message service for " + feedId);
+ LOGGER.warning("Unable to start feed message service");
}
}
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ended feed message service for " + feedId);
- }
} catch (Exception e) {
+ e.printStackTrace();
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
}
} finally {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Stopping feed message handler");
- }
- if (sfmSocket != null) {
- try {
- sfmSocket.close();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in closing socket " + e.getMessage());
- }
- }
- }
+ stop();
}
}
public void stop() {
- process = false;
- }
-
- private Socket obtainSFMSocket() throws UnknownHostException, IOException, Exception {
- Socket sfmDirServiceSocket = null;
- SuperFeedManager sfm = feedManager.getSuperFeedManager(feedId);
- try {
- FeedRuntimeManager runtimeManager = feedManager.getFeedRuntimeManager(feedId);
- sfmDirServiceSocket = runtimeManager.createClientSocket(sfm.getHost(), sfm.getPort(),
- IFeedManager.SOCKET_CONNECT_TIMEOUT);
- if (sfmDirServiceSocket == null) {
+ if (cfmSocket != null) {
+ try {
+ cfmSocket.close();
+ } catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to connect to " + sfm.getHost() + "[" + sfm.getPort() + "]");
- }
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" Connected to Super Feed Manager service " + sfm.getHost() + " " + sfm.getPort());
- }
- while (!sfmDirServiceSocket.isConnected()) {
- Thread.sleep(2000);
- }
- InputStream in = sfmDirServiceSocket.getInputStream();
- CharBuffer buffer = CharBuffer.allocate(50);
- char ch = 0;
- while (ch != EOL) {
- buffer.put(ch);
- ch = (char) in.read();
+ LOGGER.warning("Exception in closing socket " + e.getMessage());
}
- buffer.flip();
- String s = new String(buffer.array());
- int port = Integer.parseInt(s.trim());
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Response from Super Feed Manager service " + port + " will connect at "
- + sfm.getHost() + " " + port);
- }
- sfmSocket = runtimeManager.createClientSocket(sfm.getHost(), port,
- IFeedManager.SOCKET_CONNECT_TIMEOUT);
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- } finally {
- if (sfmDirServiceSocket != null) {
- sfmDirServiceSocket.close();
}
}
- return sfmSocket;
}
+
+ public Object getLock() {
+ return lock;
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMetricCollector.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMetricCollector.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMetricCollector.java
new file mode 100644
index 0000000..ff943e1
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMetricCollector.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+
+public class FeedMetricCollector implements IFeedMetricCollector {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMetricCollector.class.getName());
+
+ private static final int UNKNOWN = -1;
+
+ private final String nodeId;
+ private final AtomicInteger globalSenderId = new AtomicInteger(1);
+ private final Map<Integer, Sender> senders = new HashMap<Integer, Sender>();
+ private final Map<Integer, Series> statHistory = new HashMap<Integer, Series>();
+ private final Map<String, Sender> sendersByName = new HashMap<String, Sender>();
+
+ public FeedMetricCollector(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public synchronized int createReportSender(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ ValueType valueType, MetricType metricType) {
+ Sender sender = new Sender(globalSenderId.getAndIncrement(), connectionId, runtimeId, valueType, metricType);
+ senders.put(sender.senderId, sender);
+ sendersByName.put(sender.getDisplayName(), sender);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Sender id " + sender.getSenderId() + " created for " + sender);
+ }
+ return sender.senderId;
+ }
+
+ @Override
+ public void removeReportSender(int senderId) {
+ Sender sender = senders.get(senderId);
+ if (sender != null) {
+ statHistory.remove(senderId);
+ senders.remove(senderId);
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to remove sender Id");
+ }
+ throw new IllegalStateException("Unable to remove sender Id " + senderId + " senders " + senders);
+ }
+ }
+
+ @Override
+ public boolean sendReport(int senderId, int value) {
+ Sender sender = senders.get(senderId);
+ if (sender != null) {
+ Series series = statHistory.get(sender.senderId);
+ if (series == null) {
+ switch (sender.mType) {
+ case AVG:
+ series = new SeriesAvg();
+ break;
+ case RATE:
+ series = new SeriesRate();
+ break;
+ }
+ statHistory.put(sender.senderId, series);
+ }
+ series.addValue(value);
+ return true;
+ }
+ throw new IllegalStateException("Unable to send report sender Id " + senderId + " senders " + senders);
+ }
+
+ @Override
+ public void resetReportSender(int senderId) {
+ Sender sender = senders.get(senderId);
+ if (sender != null) {
+ Series series = statHistory.get(sender.senderId);
+ if (series != null) {
+ series.reset();
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Sender with id " + senderId + " not found. Unable to reset!");
+ }
+ throw new IllegalStateException("Unable to reset sender Id " + senderId + " senders " + senders);
+ }
+ }
+
+ private static class Sender {
+
+ private final int senderId;
+ private final MetricType mType;
+ private final String displayName;
+
+ public Sender(int senderId, FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType,
+ MetricType mType) {
+ this.senderId = senderId;
+ this.mType = mType;
+ this.displayName = createDisplayName(connectionId, runtimeId, valueType);
+ }
+
+ @Override
+ public String toString() {
+ return displayName + "[" + senderId + "]" + "(" + mType + ")";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Sender)) {
+ return false;
+ }
+ return ((Sender) o).senderId == senderId;
+ }
+
+ @Override
+ public int hashCode() {
+ return senderId;
+ }
+
+ public static String createDisplayName(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ ValueType valueType) {
+ return connectionId + " (" + runtimeId.getFeedRuntimeType() + " )" + "[" + runtimeId.getPartition() + "]"
+ + "{" + valueType + "}";
+ }
+
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ public int getSenderId() {
+ return senderId;
+ }
+ }
+
+ @Override
+ public int getMetric(int senderId) {
+ Sender sender = senders.get(senderId);
+ return getMetric(sender);
+ }
+
+ @Override
+ public int getMetric(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType) {
+ String displayName = Sender.createDisplayName(connectionId, runtimeId, valueType);
+ Sender sender = sendersByName.get(displayName);
+ return getMetric(sender);
+ }
+
+ private int getMetric(Sender sender) {
+ if (sender == null || statHistory.get(sender.getSenderId()) == null) {
+ return UNKNOWN;
+ }
+
+ float result = -1;
+ Series series = statHistory.get(sender.getSenderId());
+ switch (sender.mType) {
+ case AVG:
+ result = ((SeriesAvg) series).getAvg();
+ break;
+ case RATE:
+ result = ((SeriesRate) series).getRate();
+ break;
+ }
+ return (int) result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedPolicyAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedPolicyAccessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedPolicyAccessor.java
new file mode 100644
index 0000000..6e6ce75
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedPolicyAccessor.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * A utility class to access the configuration parameters of a feed ingestion policy.
+ */
+public class FeedPolicyAccessor implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** failure configuration **/
+ /** continue feed ingestion after a soft (runtime) failure **/
+ public static final String SOFT_FAILURE_CONTINUE = "soft.failure.continue";
+
+ /** log failed tuple to an asterixdb dataset for future reference **/
+ public static final String SOFT_FAILURE_LOG_DATA = "soft.failure.log.data";
+
+ /** continue feed ingestion after loss of one or more machines (hardware failure) **/
+ public static final String HARDWARE_FAILURE_CONTINUE = "hardware.failure.continue";
+
+ /** auto-start a loser feed when the asterixdb instance is restarted **/
+ public static final String CLUSTER_REBOOT_AUTO_RESTART = "cluster.reboot.auto.restart";
+
+ /** framework provides guarantee that each received feed record will be processed through the ingestion pipeline at least once **/
+ public static final String AT_LEAST_ONE_SEMANTICS = "atleast.once.semantics";
+
+ /** flow control configuration **/
+ /** spill excess tuples to disk if an operator cannot process incoming data at its arrival rate **/
+ public static final String SPILL_TO_DISK_ON_CONGESTION = "spill.to.disk.on.congestion";
+
+ /** the maximum size of data (tuples) that can be spilled to disk **/
+ public static final String MAX_SPILL_SIZE_ON_DISK = "max.spill.size.on.disk";
+
+ /** discard tuples altogether if an operator cannot process incoming data at its arrival rate **/
+ public static final String DISCARD_ON_CONGESTION = "discard.on.congestion";
+
+ /** maximum fraction of ingested data that can be discarded **/
+ public static final String MAX_FRACTION_DISCARD = "max.fraction.discard";
+
+ /** maximum end-to-end delay/latency in persisting a tuple through the feed ingestion pipeline **/
+ public static final String MAX_DELAY_RECORD_PERSISTENCE = "max.delay.record.persistence";
+
+ /** rate limit the inflow of tuples in accordance with the maximum capacity of the pipeline **/
+ public static final String THROTTLING_ENABLED = "throttling.enabled";
+
+ /** elasticity **/
+ public static final String ELASTIC = "elastic";
+
+ /** statistics **/
+ public static final String TIME_TRACKING = "time.tracking";
+
+ /** logging of statistics **/
+ public static final String LOGGING_STATISTICS = "logging.statistics";
+
+ public static final long NO_LIMIT = -1;
+
+ private Map<String, String> feedPolicy;
+
+ public Map<String, String> getFeedPolicy() {
+ return feedPolicy;
+ }
+
+ public FeedPolicyAccessor(Map<String, String> feedPolicy) {
+ this.feedPolicy = feedPolicy;
+ }
+
+ public void reset(Map<String, String> feedPolicy) {
+ this.feedPolicy = feedPolicy;
+ }
+
+ /** Failure recover/reporting **/
+
+ public boolean logDataOnSoftFailure() {
+ return getBooleanPropertyValue(SOFT_FAILURE_LOG_DATA, false);
+ }
+
+ public boolean continueOnSoftFailure() {
+ return getBooleanPropertyValue(SOFT_FAILURE_CONTINUE, false);
+ }
+
+ public boolean continueOnHardwareFailure() {
+ return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE, false);
+ }
+
+ public boolean autoRestartOnClusterReboot() {
+ return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART, false);
+ }
+
+ public boolean atleastOnceSemantics() {
+ return getBooleanPropertyValue(AT_LEAST_ONE_SEMANTICS, false);
+ }
+
+ /** flow control **/
+ public boolean spillToDiskOnCongestion() {
+ return getBooleanPropertyValue(SPILL_TO_DISK_ON_CONGESTION, false);
+ }
+
+ public boolean discardOnCongestion() {
+ return getMaxFractionDiscard() > 0;
+ }
+
+ public boolean throttlingEnabled() {
+ return getBooleanPropertyValue(THROTTLING_ENABLED, false);
+ }
+
+ public long getMaxSpillOnDisk() {
+ return getLongPropertyValue(MAX_SPILL_SIZE_ON_DISK, NO_LIMIT);
+ }
+
+ public float getMaxFractionDiscard() {
+ return getFloatPropertyValue(MAX_FRACTION_DISCARD, 0);
+ }
+
+ public long getMaxDelayRecordPersistence() {
+ return getLongPropertyValue(MAX_DELAY_RECORD_PERSISTENCE, Long.MAX_VALUE);
+ }
+
+ /** Elasticity **/
+ public boolean isElastic() {
+ return getBooleanPropertyValue(ELASTIC, false);
+ }
+
+ /** Statistics **/
+ public boolean isTimeTrackingEnabled() {
+ return getBooleanPropertyValue(TIME_TRACKING, false);
+ }
+
+ /** Logging of statistics **/
+ public boolean isLoggingStatisticsEnabled() {
+ return getBooleanPropertyValue(LOGGING_STATISTICS, false);
+ }
+
+ private boolean getBooleanPropertyValue(String key, boolean defValue) {
+ String v = feedPolicy.get(key);
+ return v == null ? false : Boolean.valueOf(v);
+ }
+
+ private long getLongPropertyValue(String key, long defValue) {
+ String v = feedPolicy.get(key);
+ return v != null ? Long.parseLong(v) : defValue;
+ }
+
+ private float getFloatPropertyValue(String key, float defValue) {
+ String v = feedPolicy.get(key);
+ return v != null ? Float.parseFloat(v) : defValue;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedReport.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedReport.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedReport.java
deleted file mode 100644
index cda56ae..0000000
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedReport.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package edu.uci.ics.asterix.common.feeds;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager.FeedReportMessageType;
-
-public class FeedReport implements Comparable {
-
- private FeedConnectionId feedId;
- private FeedReportMessageType reportType;
- private int partition = -1;
- private FeedRuntimeType runtimeType;
- private long value = -1;
- private String[] representation;
-
- public FeedReport() {
- }
-
- public FeedReport(String message) {
- representation = message.split("\\|");
- }
-
- public void reset(String message) {
- representation = message.split("\\|");
- reportType = null;
- feedId = null;
- runtimeType = null;
- partition = -1;
- value = -1;
- }
-
- @Override
- public String toString() {
- return getFeedId() + " " + getReportType() + " " + getPartition() + " " + getRuntimeType() + " " + getValue();
- }
-
- public FeedConnectionId getFeedId() {
- if (feedId == null) {
- String feedIdRep = representation[1];
- String[] feedIdComp = feedIdRep.split(":");
- feedId = new FeedConnectionId(feedIdComp[0], feedIdComp[1], feedIdComp[2]);
- }
- return feedId;
- }
-
- public FeedReportMessageType getReportType() {
- if (reportType == null) {
- reportType = FeedReportMessageType.valueOf(representation[0].toUpperCase());
- }
- return reportType;
- }
-
- public int getPartition() {
- if (partition < 0) {
- partition = Integer.parseInt(representation[3]);
- }
- return partition;
- }
-
- public FeedRuntimeType getRuntimeType() {
- if (runtimeType == null) {
- runtimeType = FeedRuntimeType.valueOf(representation[2].toUpperCase());
- }
- return runtimeType;
- }
-
- public long getValue() {
- if (value < 0) {
- value = Long.parseLong(representation[4]);
- }
- return value;
- }
-
- public String[] getRepresentation() {
- return representation;
- }
-
- @Override
- public int compareTo(Object o) {
- if (!(o instanceof FeedReport)) {
- throw new IllegalArgumentException("Incorrect operand type " + o);
- }
-
- FeedReport other = (FeedReport) o;
- if (!other.getReportType().equals(getReportType())) {
- throw new IllegalArgumentException("Incorrect operand type " + o);
- }
-
- int returnValue = 0;
-
- switch (getReportType()) {
- case CONGESTION:
- returnValue = ranking.get(getRuntimeType()) - ranking.get(other.getRuntimeType());
- break;
-
- case THROUGHPUT:
- returnValue = (int) (other.getValue() - getValue());
- break;
- }
-
- return returnValue;
- }
-
- private static Map<FeedRuntimeType, Integer> ranking = populateRanking();
-
- private static Map<FeedRuntimeType, Integer> populateRanking() {
- Map<FeedRuntimeType, Integer> ranking = new HashMap<FeedRuntimeType, Integer>();
- ranking.put(FeedRuntimeType.INGESTION, 1);
- ranking.put(FeedRuntimeType.COMPUTE, 2);
- ranking.put(FeedRuntimeType.STORAGE, 3);
- ranking.put(FeedRuntimeType.COMMIT, 4);
- return ranking;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
index 88e1db5..47f373b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
@@ -14,155 +14,57 @@
*/
package edu.uci.ics.asterix.common.feeds;
-import java.nio.ByteBuffer;
-
+import edu.uci.ics.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-public class FeedRuntime {
-
- public enum FeedRuntimeType {
- INGESTION,
- COMPUTE,
- STORAGE,
- COMMIT
- }
+public class FeedRuntime implements IFeedRuntime {
- /** A unique identifier */
- protected final FeedRuntimeId feedRuntimeId;
+ /** A unique identifier for the runtime **/
+ protected final FeedRuntimeId runtimeId;
- /** The runtime state: @see FeedRuntimeState */
- protected FeedRuntimeState runtimeState;
+ /** The output frame writer associated with the runtime **/
+ protected IFrameWriter frameWriter;
- public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType) {
- this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, partition);
- }
+ /** The pre-processor associated with the runtime **/
+ protected FeedRuntimeInputHandler inputHandler;
- public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType, String operandId) {
- this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, operandId, partition);
+ public FeedRuntime(FeedRuntimeId runtimeId, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter) {
+ this.runtimeId = runtimeId;
+ this.frameWriter = frameWriter;
+ this.inputHandler = inputHandler;
}
- public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType, String operandId,
- FeedRuntimeState runtimeState) {
- this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, operandId, partition);
- this.runtimeState = runtimeState;
+ public void setFrameWriter(IFeedOperatorOutputSideHandler frameWriter) {
+ this.frameWriter = frameWriter;
}
@Override
- public String toString() {
- return feedRuntimeId + " " + "runtime state present ? " + (runtimeState != null);
+ public FeedRuntimeId getRuntimeId() {
+ return runtimeId;
}
- public static class FeedRuntimeState {
-
- private ByteBuffer frame;
- private IFrameWriter frameWriter;
- private Exception exception;
-
- public FeedRuntimeState(ByteBuffer frame, IFrameWriter frameWriter, Exception exception) {
- this.frame = frame;
- this.frameWriter = frameWriter;
- this.exception = exception;
- }
-
- public ByteBuffer getFrame() {
- return frame;
- }
-
- public void setFrame(ByteBuffer frame) {
- this.frame = frame;
- }
-
- public IFrameWriter getFrameWriter() {
- return frameWriter;
- }
-
- public void setFrameWriter(IFrameWriter frameWriter) {
- this.frameWriter = frameWriter;
- }
-
- public Exception getException() {
- return exception;
- }
-
- public void setException(Exception exception) {
- this.exception = exception;
- }
-
+ @Override
+ public IFrameWriter getFeedFrameWriter() {
+ return frameWriter;
}
- public static class FeedRuntimeId {
-
- public static final String DEFAULT_OPERATION_ID = "N/A";
- private final FeedRuntimeType feedRuntimeType;
- private final String operandId;
- private final FeedConnectionId feedId;
- private final int partition;
- private final int hashCode;
-
- public FeedRuntimeId(FeedConnectionId feedId, FeedRuntimeType runtimeType, String operandId, int partition) {
- this.feedRuntimeType = runtimeType;
- this.operandId = operandId;
- this.feedId = feedId;
- this.partition = partition;
- this.hashCode = (feedId + "[" + partition + "]" + feedRuntimeType).hashCode();
- }
-
- public FeedRuntimeId(FeedConnectionId feedId, FeedRuntimeType runtimeType, int partition) {
- this.feedRuntimeType = runtimeType;
- this.operandId = DEFAULT_OPERATION_ID;
- this.feedId = feedId;
- this.partition = partition;
- this.hashCode = (feedId + "[" + partition + "]" + feedRuntimeType).hashCode();
- }
-
- @Override
- public String toString() {
- return feedId + "[" + partition + "]" + " " + feedRuntimeType + "(" + operandId + ")";
- }
-
- @Override
- public int hashCode() {
- return hashCode;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof FeedRuntimeId) {
- FeedRuntimeId oid = ((FeedRuntimeId) o);
- return oid.getFeedId().equals(feedId) && oid.getFeedRuntimeType().equals(feedRuntimeType)
- && oid.getOperandId().equals(operandId) && oid.getPartition() == partition;
- }
- return false;
- }
-
- public FeedRuntimeType getFeedRuntimeType() {
- return feedRuntimeType;
- }
-
- public FeedConnectionId getFeedId() {
- return feedId;
- }
-
- public String getOperandId() {
- return operandId;
- }
-
- public int getPartition() {
- return partition;
- }
-
+ @Override
+ public String toString() {
+ return runtimeId.toString();
}
- public FeedRuntimeState getRuntimeState() {
- return runtimeState;
+ @Override
+ public FeedRuntimeInputHandler getInputHandler() {
+ return inputHandler;
}
- public void setRuntimeState(FeedRuntimeState runtimeState) {
- this.runtimeState = runtimeState;
+ public Mode getMode() {
+ return inputHandler != null ? inputHandler.getMode() : Mode.PROCESS;
}
- public FeedRuntimeId getFeedRuntimeId() {
- return feedRuntimeId;
+ public void setMode(Mode mode) {
+ this.inputHandler.setMode(mode);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeId.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeId.java
new file mode 100644
index 0000000..098cc50
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeId.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.Serializable;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+public class FeedRuntimeId implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String DEFAULT_OPERAND_ID = "N/A";
+
+ private final FeedRuntimeType runtimeType;
+ private final int partition;
+ private final String operandId;
+
+ public FeedRuntimeId(FeedRuntimeType runtimeType, int partition, String operandId) {
+ this.runtimeType = runtimeType;
+ this.partition = partition;
+ this.operandId = operandId;
+ }
+
+ @Override
+ public String toString() {
+ return runtimeType + "[" + partition + "]" + "{" + operandId + "}";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FeedRuntimeId)) {
+ return false;
+ }
+ FeedRuntimeId other = (FeedRuntimeId) o;
+ return (other.getFeedRuntimeType().equals(runtimeType) && other.getOperandId().equals(operandId) && other
+ .getPartition() == partition);
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ public FeedRuntimeType getFeedRuntimeType() {
+ return runtimeType;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public FeedRuntimeType getRuntimeType() {
+ return runtimeType;
+ }
+
+ public String getOperandId() {
+ return operandId;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeInputHandler.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeInputHandler.java
new file mode 100644
index 0000000..8db1c9d
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeInputHandler.java
@@ -0,0 +1,426 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.DataBucket.ContentType;
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.message.FeedCongestionMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * Provides for error-handling and input-side buffering for a feed runtime.
+ */
+public class FeedRuntimeInputHandler implements IFrameWriter {
+
+ private static Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
+
+ private final FeedConnectionId connectionId;
+ private final FeedRuntimeId runtimeId;
+ private final FeedPolicyAccessor feedPolicyAccessor;
+ private final boolean bufferingEnabled;
+ private final IExceptionHandler exceptionHandler;
+ private final FeedFrameDiscarder discarder;
+ private final FeedFrameSpiller spiller;
+ private final FeedPolicyAccessor fpa;
+ private final IFeedManager feedManager;
+
+ private IFrameWriter coreOperator;
+ private MonitoredBuffer mBuffer;
+ private DataBucketPool pool;
+ private FrameCollection frameCollection;
+ private Mode mode;
+ private Mode lastMode;
+ private boolean finished;
+ private long nProcessed;
+ private boolean throttlingEnabled;
+
+ private FrameEventCallback frameEventCallback;
+
+ public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId, IFrameWriter coreOperator,
+ FeedPolicyAccessor fpa, boolean bufferingEnabled, FrameTupleAccessor fta,
+ RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions) throws IOException {
+ this.connectionId = connectionId;
+ this.runtimeId = runtimeId;
+ this.coreOperator = coreOperator;
+ this.bufferingEnabled = bufferingEnabled;
+ this.feedPolicyAccessor = fpa;
+ this.spiller = new FeedFrameSpiller(ctx, connectionId, runtimeId, fpa);
+ this.discarder = new FeedFrameDiscarder(ctx, connectionId, runtimeId, fpa, this);
+ this.exceptionHandler = new FeedExceptionHandler(ctx, fta, recordDesc, feedManager, connectionId);
+ this.mode = Mode.PROCESS;
+ this.lastMode = Mode.PROCESS;
+ this.finished = false;
+ this.fpa = fpa;
+ this.feedManager = feedManager;
+ this.pool = (DataBucketPool) feedManager.getFeedMemoryManager().getMemoryComponent(
+ IFeedMemoryComponent.Type.POOL);
+ this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager().getMemoryComponent(
+ IFeedMemoryComponent.Type.COLLECTION);
+ this.frameEventCallback = new FrameEventCallback(fpa, this, coreOperator);
+ this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
+ feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
+ nPartitions, fpa);
+ this.mBuffer.start();
+ this.throttlingEnabled = false;
+ }
+
+ public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ try {
+ switch (mode) {
+ case PROCESS:
+ switch (lastMode) {
+ case SPILL:
+ case POST_SPILL_DISCARD:
+ setMode(Mode.PROCESS_SPILL);
+ processSpilledBacklog();
+ break;
+ case STALL:
+ setMode(Mode.PROCESS_BACKLOG);
+ processBufferredBacklog();
+ break;
+ default:
+ break;
+ }
+ process(frame);
+ break;
+ case PROCESS_BACKLOG:
+ case PROCESS_SPILL:
+ process(frame);
+ break;
+ case SPILL:
+ spill(frame);
+ break;
+ case DISCARD:
+ case POST_SPILL_DISCARD:
+ discard(frame);
+ break;
+ case STALL:
+ switch (runtimeId.getFeedRuntimeType()) {
+ case COLLECT:
+ case COMPUTE_COLLECT:
+ case COMPUTE:
+ case STORE:
+ bufferDataUntilRecovery(frame);
+ break;
+ default:
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Discarding frame during " + mode + " mode " + this.runtimeId);
+ }
+ break;
+ }
+ break;
+ case END:
+ case FAIL:
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Ignoring incoming tuples in " + mode + " mode");
+ }
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void bufferDataUntilRecovery(ByteBuffer frame) throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Bufferring data until recovery is complete " + this.runtimeId);
+ }
+ if (frameCollection == null) {
+ this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager().getMemoryComponent(
+ IFeedMemoryComponent.Type.COLLECTION);
+ }
+ if (frameCollection == null) {
+ discarder.processMessage(frame);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Running low on memory! DISCARDING FRAME ");
+ }
+ } else {
+ boolean success = frameCollection.addFrame(frame);
+ if (!success) {
+ if (fpa.spillToDiskOnCongestion()) {
+ if (frame != null) {
+ spiller.processMessage(frame);
+ } // TODO handle the else case
+ } else {
+ discarder.processMessage(frame);
+ }
+ }
+ }
+ }
+
+ public void reportUnresolvableCongestion() throws HyracksDataException {
+ if (this.runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.COMPUTE)) {
+ FeedCongestionMessage congestionReport = new FeedCongestionMessage(connectionId, runtimeId,
+ mBuffer.getInflowRate(), mBuffer.getOutflowRate(), mode);
+ feedManager.getFeedMessageService().sendMessage(congestionReport);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Congestion reported " + this.connectionId + " " + this.runtimeId);
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unresolvable congestion at " + this.connectionId + " " + this.runtimeId);
+ }
+ }
+ }
+
+ private void processBufferredBacklog() throws HyracksDataException {
+ try {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Processing backlog " + this.runtimeId);
+ }
+
+ if (frameCollection != null) {
+ Iterator<ByteBuffer> backlog = frameCollection.getFrameCollectionIterator();
+ while (backlog.hasNext()) {
+ process(backlog.next());
+ nProcessed++;
+ }
+ DataBucket bucket = pool.getDataBucket();
+ bucket.setContentType(ContentType.EOSD);
+ bucket.setDesiredReadCount(1);
+ mBuffer.sendMessage(bucket);
+ feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
+ frameCollection = null;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void processSpilledBacklog() throws HyracksDataException {
+ try {
+ Iterator<ByteBuffer> backlog = spiller.replayData();
+ while (backlog.hasNext()) {
+ process(backlog.next());
+ nProcessed++;
+ }
+ DataBucket bucket = pool.getDataBucket();
+ bucket.setContentType(ContentType.EOSD);
+ bucket.setDesiredReadCount(1);
+ mBuffer.sendMessage(bucket);
+ spiller.reset();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ }
+
+ protected void process(ByteBuffer frame) throws HyracksDataException {
+ boolean frameProcessed = false;
+ while (!frameProcessed) {
+ try {
+ if (!bufferingEnabled) {
+ coreOperator.nextFrame(frame); // synchronous
+ mBuffer.sendReport(frame);
+ } else {
+ DataBucket bucket = pool.getDataBucket();
+ if (bucket != null) {
+ if (frame != null) {
+ bucket.reset(frame); // created a copy here
+ bucket.setContentType(ContentType.DATA);
+ } else {
+ bucket.setContentType(ContentType.EOD);
+ }
+ bucket.setDesiredReadCount(1);
+ mBuffer.sendMessage(bucket);
+ mBuffer.sendReport(frame);
+ nProcessed++;
+ } else {
+ if (fpa.spillToDiskOnCongestion()) {
+ if (frame != null) {
+ boolean spilled = spiller.processMessage(frame);
+ if (spilled) {
+ setMode(Mode.SPILL);
+ } else {
+ reportUnresolvableCongestion();
+ }
+ }
+ } else if (fpa.discardOnCongestion()) {
+ boolean discarded = discarder.processMessage(frame);
+ if (!discarded) {
+ reportUnresolvableCongestion();
+ }
+ } else if (fpa.throttlingEnabled()) {
+ setThrottlingEnabled(true);
+ } else {
+ reportUnresolvableCongestion();
+ }
+
+ }
+ }
+ frameProcessed = true;
+ } catch (Exception e) {
+ if (feedPolicyAccessor.continueOnSoftFailure()) {
+ frame = exceptionHandler.handleException(e, frame);
+ if (frame == null) {
+ frameProcessed = true;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Encountered exception! " + e.getMessage()
+ + "Insufficient information, Cannot extract failing tuple");
+ }
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Ingestion policy does not require recovering from tuple. Feed would terminate");
+ }
+ mBuffer.close(false);
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ }
+
+ private void spill(ByteBuffer frame) throws Exception {
+ boolean success = spiller.processMessage(frame);
+ if (!success) {
+ // limit reached
+ setMode(Mode.POST_SPILL_DISCARD);
+ reportUnresolvableCongestion();
+ }
+ }
+
+ private void discard(ByteBuffer frame) throws Exception {
+ boolean success = discarder.processMessage(frame);
+ if (!success) { // limit reached
+ reportUnresolvableCongestion();
+ }
+ }
+
+ public Mode getMode() {
+ return mode;
+ }
+
+ public synchronized void setMode(Mode mode) {
+ if (mode.equals(this.mode)) {
+ return;
+ }
+ this.lastMode = this.mode;
+ this.mode = mode;
+ if (mode.equals(Mode.END)) {
+ this.close();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Switched from " + lastMode + " to " + mode + " " + this.runtimeId);
+ }
+ }
+
+ public void close() {
+ if (mBuffer != null) {
+ boolean disableMonitoring = !this.mode.equals(Mode.STALL);
+ if (frameCollection != null) {
+ feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
+ }
+ if (pool != null) {
+ feedManager.getFeedMemoryManager().releaseMemoryComponent(pool);
+ }
+ mBuffer.close(false, disableMonitoring);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Closed input side handler for " + this.runtimeId + " disabled monitoring "
+ + disableMonitoring + " Mode for runtime " + this.mode);
+ }
+ }
+ }
+
+ public IFrameWriter getCoreOperator() {
+ return coreOperator;
+ }
+
+ public void setCoreOperator(IFrameWriter coreOperator) {
+ this.coreOperator = coreOperator;
+ mBuffer.setFrameWriter(coreOperator);
+ frameEventCallback.setCoreOperator(coreOperator);
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ public void setFinished(boolean finished) {
+ this.finished = finished;
+ }
+
+ public long getProcessed() {
+ return nProcessed;
+ }
+
+ public FeedRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ coreOperator.open();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ coreOperator.fail();
+ }
+
+ public void reset(int nPartitions) {
+ this.mBuffer.setNumberOfPartitions(nPartitions);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Reset number of partitions to " + nPartitions + " for " + this.runtimeId);
+ }
+ if (mBuffer != null) {
+ mBuffer.reset();
+ }
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public IFeedManager getFeedManager() {
+ return feedManager;
+ }
+
+ public MonitoredBuffer getmBuffer() {
+ return mBuffer;
+ }
+
+ public boolean isThrottlingEnabled() {
+ return throttlingEnabled;
+ }
+
+ public void setThrottlingEnabled(boolean throttlingEnabled) {
+ if (this.throttlingEnabled != throttlingEnabled) {
+ this.throttlingEnabled = throttlingEnabled;
+ IFeedMessage throttlingEnabledMesg = new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
+ feedManager.getFeedMessageService().sendMessage(throttlingEnabledMesg);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Throttling " + throttlingEnabled + " for " + this.connectionId + "[" + runtimeId + "]");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeManager.java
index a68f6b8..d0438f8 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeManager.java
@@ -15,90 +15,42 @@
package edu.uci.ics.asterix.common.feeds;
import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedConnectionManager;
public class FeedRuntimeManager {
private static Logger LOGGER = Logger.getLogger(FeedRuntimeManager.class.getName());
- private final FeedConnectionId feedId;
- private final IFeedManager feedManager;
- private SuperFeedManager superFeedManager;
+ private final FeedConnectionId connectionId;
+ private final IFeedConnectionManager connectionManager;
private final Map<FeedRuntimeId, FeedRuntime> feedRuntimes;
+
private final ExecutorService executorService;
- private FeedMessageService messageService;
- private SocketFactory socketFactory = new SocketFactory();
- private final LinkedBlockingQueue<String> feedReportQueue;
- public FeedRuntimeManager(FeedConnectionId feedId, IFeedManager feedManager) {
- this.feedId = feedId;
- feedRuntimes = new ConcurrentHashMap<FeedRuntimeId, FeedRuntime>();
- executorService = Executors.newCachedThreadPool();
- feedReportQueue = new LinkedBlockingQueue<String>();
- this.feedManager = feedManager;
+ public FeedRuntimeManager(FeedConnectionId connectionId, IFeedConnectionManager feedConnectionManager) {
+ this.connectionId = connectionId;
+ this.feedRuntimes = new ConcurrentHashMap<FeedRuntimeId, FeedRuntime>();
+ this.executorService = Executors.newCachedThreadPool();
+ this.connectionManager = feedConnectionManager;
}
- public void close(boolean closeAll) throws IOException {
- socketFactory.close();
-
- if (messageService != null) {
- messageService.stop();
+ public void close() throws IOException {
+ if (executorService != null) {
+ executorService.shutdownNow();
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Shut down message services for :" + feedId);
- }
- messageService = null;
- }
- if (superFeedManager != null && superFeedManager.isLocal()) {
- superFeedManager.stop();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Shut down super feed manager for :" + feedId);
- }
- }
-
- if (closeAll) {
- if (executorService != null) {
- executorService.shutdownNow();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Shut down executor service for :" + feedId);
- }
+ LOGGER.info("Shut down executor service for :" + connectionId);
}
}
}
- public void setSuperFeedManager(SuperFeedManager sfm) throws UnknownHostException, IOException, Exception {
- this.superFeedManager = sfm;
- if (sfm.isLocal()) {
- sfm.start();
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Started Super Feed Manager for feed :" + feedId);
- }
- this.messageService = new FeedMessageService(feedId, feedManager);
- messageService.start();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Started Feed Message Service for feed :" + feedId);
- }
- }
-
- public SuperFeedManager getSuperFeedManager() {
- return superFeedManager;
- }
-
public FeedRuntime getFeedRuntime(FeedRuntimeId runtimeId) {
return feedRuntimes.get(runtimeId);
}
@@ -107,17 +59,10 @@ public class FeedRuntimeManager {
feedRuntimes.put(runtimeId, feedRuntime);
}
- public void deregisterFeedRuntime(FeedRuntimeId runtimeId) {
+ public synchronized void deregisterFeedRuntime(FeedRuntimeId runtimeId) {
feedRuntimes.remove(runtimeId);
if (feedRuntimes.isEmpty()) {
- synchronized (this) {
- if (feedRuntimes.isEmpty()) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("De-registering feed");
- }
- feedManager.deregisterFeed(runtimeId.getFeedId());
- }
- }
+ connectionManager.deregisterFeed(connectionId);
}
}
@@ -125,114 +70,8 @@ public class FeedRuntimeManager {
return executorService;
}
- public FeedMessageService getMessageService() {
- return messageService;
- }
-
- public Socket createClientSocket(String host, int port) throws UnknownHostException, IOException {
- return socketFactory.createClientSocket(host, port);
- }
-
- public Socket createClientSocket(String host, int port, long timeout) throws UnknownHostException, IOException {
- Socket client = null;
- boolean continueAttempt = true;
- long startAttempt = System.currentTimeMillis();
- long endAttempt = System.currentTimeMillis();
- while (client == null && continueAttempt) {
- try {
- client = socketFactory.createClientSocket(host, port);
- } catch (Exception e) {
- endAttempt = System.currentTimeMillis();
- if (endAttempt - startAttempt > timeout) {
- continueAttempt = false;
- }
- }
- }
- return client;
- }
-
- public ServerSocket createServerSocket(int port) throws IOException {
- return socketFactory.createServerSocket(port);
- }
-
- private static class SocketFactory {
-
- private final Map<SocketId, Socket> sockets = new HashMap<SocketId, Socket>();
- private final List<ServerSocket> serverSockets = new ArrayList<ServerSocket>();
-
- public Socket createClientSocket(String host, int port) throws UnknownHostException, IOException {
- Socket socket = new Socket(host, port);
- sockets.put(new SocketId(host, port), socket);
- return socket;
- }
-
- public void close() throws IOException {
- for (ServerSocket socket : serverSockets) {
- socket.close();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Closed server socket :" + socket);
- }
- }
-
- for (Entry<SocketId, Socket> entry : sockets.entrySet()) {
- entry.getValue().close();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Closed client socket :" + entry.getKey());
- }
- }
- }
-
- public ServerSocket createServerSocket(int port) throws IOException {
- ServerSocket socket = new ServerSocket(port);
- serverSockets.add(socket);
- return socket;
- }
-
- private static class SocketId {
- private final String host;
- private final int port;
-
- public SocketId(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- @Override
- public String toString() {
- return host + "[" + port + "]";
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof SocketId)) {
- return false;
- }
-
- return ((SocketId) o).getHost().equals(host) && ((SocketId) o).getPort() == port;
- }
-
- }
- }
-
- public FeedConnectionId getFeedId() {
- return feedId;
- }
-
- public LinkedBlockingQueue<String> getFeedReportQueue() {
- return feedReportQueue;
+ public Set<FeedRuntimeId> getFeedRuntimes() {
+ return feedRuntimes.keySet();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeReport.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeReport.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeReport.java
new file mode 100644
index 0000000..868bcc7
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeReport.java
@@ -0,0 +1,5 @@
+package edu.uci.ics.asterix.common.feeds;
+
+public class FeedRuntimeReport {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedStorageStatistics.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedStorageStatistics.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedStorageStatistics.java
new file mode 100644
index 0000000..1edf210
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedStorageStatistics.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.asterix.common.feeds;
+
+public class FeedStorageStatistics {
+
+ private long avgPersistenceDelay;
+ private long lastIntakeTimestamp;
+
+
+}