You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:13 UTC
[16/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixFeedProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixFeedProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixFeedProperties.java
new file mode 100644
index 0000000..aa114e6
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixFeedProperties.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.config;
+
+public class AsterixFeedProperties extends AbstractAsterixProperties {
+
+ private static final String FEED_CENTRAL_MANAGER_PORT_KEY = "feed.central.manager.port";
+ private static final int FEED_CENTRAL_MANAGER_PORT_DEFAULT = 4500; // port at which the Central Feed Manager listens for control messages from local Feed Managers
+
+ private static final String FEED_MEMORY_GLOBALBUDGET_KEY = "feed.memory.global.budget";
+ private static final long FEED_MEMORY_GLOBALBUDGET_DEFAULT = 67108864; // 64MB or 2048 frames (assuming 32768 as frame size)
+
+ private static final String FEED_MEMORY_AVAILABLE_WAIT_TIMEOUT_KEY = "feed.memory.available.wait.timeout";
+ private static final long FEED_MEMORY_AVAILABLE_WAIT_TIMEOUT_DEFAULT = 10; // 10 seconds
+
+ private static final String FEED_PENDING_WORK_THRESHOLD_KEY = "feed.pending.work.threshold";
+ private static final int FEED_PENDING_WORK_THRESHOLD_DEFAULT = 50; // maximum length of input queue before triggering corrective action
+
+ private static final String FEED_MAX_SUCCESSIVE_THRESHOLD_PERIOD_KEY = "feed.max.threshold.period";
+ private static final int FEED_MAX_SUCCESSIVE_THRESHOLD_PERIOD_DEFAULT = 5;
+
+ public AsterixFeedProperties(AsterixPropertiesAccessor accessor) {
+ super(accessor);
+ }
+
+ public long getMemoryComponentGlobalBudget() {
+ return accessor.getProperty(FEED_MEMORY_GLOBALBUDGET_KEY, FEED_MEMORY_GLOBALBUDGET_DEFAULT,
+ PropertyInterpreters.getLongPropertyInterpreter());
+ }
+
+ public long getMemoryAvailableWaitTimeout() {
+ return accessor.getProperty(FEED_MEMORY_AVAILABLE_WAIT_TIMEOUT_KEY, FEED_MEMORY_AVAILABLE_WAIT_TIMEOUT_DEFAULT,
+ PropertyInterpreters.getLongPropertyInterpreter());
+ }
+
+ public int getFeedCentralManagerPort() {
+ return accessor.getProperty(FEED_CENTRAL_MANAGER_PORT_KEY, FEED_CENTRAL_MANAGER_PORT_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
+ public int getPendingWorkThreshold() {
+ return accessor.getProperty(FEED_PENDING_WORK_THRESHOLD_KEY, FEED_PENDING_WORK_THRESHOLD_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
+ public int getMaxSuccessiveThresholdPeriod() {
+ return accessor.getProperty(FEED_MAX_SUCCESSIVE_THRESHOLD_PERIOD_KEY,
+ FEED_MAX_SUCCESSIVE_THRESHOLD_PERIOD_DEFAULT, PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/IAsterixPropertiesProvider.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/IAsterixPropertiesProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/IAsterixPropertiesProvider.java
index e696519..32386f6 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/IAsterixPropertiesProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/IAsterixPropertiesProvider.java
@@ -24,4 +24,6 @@ public interface IAsterixPropertiesProvider {
public AsterixMetadataProperties getMetadataProperties();
public AsterixExternalProperties getExternalProperties();
+
+ public AsterixFeedProperties getFeedProperties();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/exceptions/FrameDataException.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/exceptions/FrameDataException.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/exceptions/FrameDataException.java
new file mode 100644
index 0000000..4651607
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/exceptions/FrameDataException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.exceptions;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameDataException extends HyracksDataException {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int tupleIndex;
+
+ public FrameDataException(int tupleIndex, Exception cause) {
+ super(cause);
+ this.tupleIndex = tupleIndex;
+ }
+
+ public int getTupleIndex() {
+ return tupleIndex;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/BasicMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/BasicMonitoredBuffer.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/BasicMonitoredBuffer.java
new file mode 100644
index 0000000..ba38f87
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/BasicMonitoredBuffer.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class BasicMonitoredBuffer extends MonitoredBuffer {
+
+ public BasicMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter, FrameTupleAccessor fta,
+ RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+ FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+ IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+ super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+ exceptionHandler, callback, nPartitions, policyAccessor);
+ }
+
+ @Override
+ protected boolean monitorProcessingRate() {
+ return false;
+ }
+
+ @Override
+ protected boolean logInflowOutflowRate() {
+ return false;
+ }
+
+ @Override
+ protected IFramePreprocessor getFramePreProcessor() {
+ return null;
+ }
+
+ @Override
+ protected IFramePostProcessor getFramePostProcessor() {
+ return null;
+ }
+
+ @Override
+ protected boolean monitorInputQueueLength() {
+ return false;
+ }
+
+ @Override
+ protected boolean reportInflowRate() {
+ return false;
+ }
+
+ @Override
+ protected boolean reportOutflowRate() {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/CollectionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/CollectionRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/CollectionRuntime.java
new file mode 100644
index 0000000..e324617
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/CollectionRuntime.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.FeedFrameCollector.State;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.asterix.common.feeds.api.ISubscriberRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * Represents the feed runtime that collects feed tuples from another feed.
+ * In case of a primary feed, the CollectionRuntime collects tuples from the feed
+ * intake job. For a secondary feed, tuples are collected from the intake/compute
+ * runtime associated with the source feed.
+ */
+public class CollectionRuntime extends FeedRuntime implements ISubscriberRuntime {
+
+ private final FeedConnectionId connectionId;
+ private final ISubscribableRuntime sourceRuntime;
+ private final Map<String, String> feedPolicy;
+ private FeedFrameCollector frameCollector;
+
+ public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ FeedRuntimeInputHandler inputSideHandler, IFrameWriter outputSideWriter,
+ ISubscribableRuntime sourceRuntime, Map<String, String> feedPolicy) {
+ super(runtimeId, inputSideHandler, outputSideWriter);
+ this.connectionId = connectionId;
+ this.sourceRuntime = sourceRuntime;
+ this.feedPolicy = feedPolicy;
+ }
+
+ public State waitTillCollectionOver() throws InterruptedException {
+ if (!(isCollectionOver())) {
+ synchronized (frameCollector) {
+ while (!isCollectionOver()) {
+ frameCollector.wait();
+ }
+ }
+ }
+ return frameCollector.getState();
+ }
+
+ private boolean isCollectionOver() {
+ return frameCollector.getState().equals(FeedFrameCollector.State.FINISHED)
+ || frameCollector.getState().equals(FeedFrameCollector.State.HANDOVER);
+ }
+
+ public void setMode(Mode mode) {
+ getInputHandler().setMode(mode);
+ }
+
+ @Override
+ public Map<String, String> getFeedPolicy() {
+ return feedPolicy;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public ISubscribableRuntime getSourceRuntime() {
+ return sourceRuntime;
+ }
+
+ public void setFrameCollector(FeedFrameCollector frameCollector) {
+ this.frameCollector = frameCollector;
+ }
+
+ public FeedFrameCollector getFrameCollector() {
+ return frameCollector;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/ComputeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/ComputeSideMonitoredBuffer.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/ComputeSideMonitoredBuffer.java
new file mode 100644
index 0000000..161f27a
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/ComputeSideMonitoredBuffer.java
@@ -0,0 +1,55 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class ComputeSideMonitoredBuffer extends MonitoredBuffer {
+
+ public ComputeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+ FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+ FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+ IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+ super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+ exceptionHandler, callback, nPartitions, policyAccessor);
+ }
+
+ @Override
+ protected boolean monitorProcessingRate() {
+ return true;
+ }
+
+ protected boolean logInflowOutflowRate() {
+ return true;
+ }
+
+ @Override
+ protected boolean monitorInputQueueLength() {
+ return true;
+ }
+
+ @Override
+ protected IFramePreprocessor getFramePreProcessor() {
+ return null;
+ }
+
+ @Override
+ protected IFramePostProcessor getFramePostProcessor() {
+ return null;
+ }
+
+ @Override
+ protected boolean reportOutflowRate() {
+ return false;
+ }
+
+ @Override
+ protected boolean reportInflowRate() {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucket.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucket.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucket.java
new file mode 100644
index 0000000..ea4480e
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucket.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataBucket {
+
+ private static final AtomicInteger globalBucketId = new AtomicInteger(0);
+
+ private final ByteBuffer content;
+ private final AtomicInteger readCount;
+ private final int bucketId;
+
+ private int desiredReadCount;
+ private ContentType contentType;
+
+ private final DataBucketPool pool;
+
+ public enum ContentType {
+ DATA, // data (feed tuple)
+ EOD, // A signal indicating that there shall be no more data
+ EOSD // End of processing of spilled data
+ }
+
+ public DataBucket(DataBucketPool pool) {
+ this.content = ByteBuffer.allocate(pool.getFrameSize());
+ this.readCount = new AtomicInteger(0);
+ this.pool = pool;
+ this.contentType = ContentType.DATA;
+ this.bucketId = globalBucketId.incrementAndGet();
+ }
+
+ public synchronized void reset(ByteBuffer frame) {
+ if (frame != null) {
+ content.flip();
+ System.arraycopy(frame.array(), 0, content.array(), 0, frame.limit());
+ content.limit(frame.limit());
+ content.position(0);
+ }
+ }
+
+ public synchronized void doneReading() {
+ if (readCount.incrementAndGet() == desiredReadCount) {
+ readCount.set(0);
+ pool.returnDataBucket(this);
+ }
+ }
+
+ public void setDesiredReadCount(int rCount) {
+ this.desiredReadCount = rCount;
+ }
+
+ public ContentType getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(ContentType contentType) {
+ this.contentType = contentType;
+ }
+
+ public synchronized ByteBuffer getContent() {
+ return content;
+ }
+
+ @Override
+ public String toString() {
+ return "DataBucket [" + bucketId + "]" + " (" + readCount + "," + desiredReadCount + ")";
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucketPool.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucketPool.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucketPool.java
new file mode 100644
index 0000000..1a66b38
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucketPool.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.Stack;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+
+/**
+ * Represents a pool of reusable {@link DataBucket}
+ */
+public class DataBucketPool implements IFeedMemoryComponent {
+
+ /** A unique identifier for the memory component **/
+ private final int componentId;
+
+ /** The {@link IFeedMemoryManager} for the NodeController **/
+ private final IFeedMemoryManager memoryManager;
+
+ /** A collection of available data buckets {@link DataBucket} **/
+ private final Stack<DataBucket> pool;
+
+ /** The total number of data buckets {@link DataBucket} allocated **/
+ private int totalAllocation;
+
+ /** The fixed frame size as configured for the asterix runtime **/
+ private final int frameSize;
+
+ public DataBucketPool(int componentId, IFeedMemoryManager memoryManager, int size, int frameSize) {
+ this.componentId = componentId;
+ this.memoryManager = memoryManager;
+ this.pool = new Stack<DataBucket>();
+ this.frameSize = frameSize;
+ expand(size);
+ }
+
+ public synchronized void returnDataBucket(DataBucket bucket) {
+ pool.push(bucket);
+ }
+
+ public synchronized DataBucket getDataBucket() {
+ if (pool.size() == 0) {
+ if (!memoryManager.expandMemoryComponent(this)) {
+ return null;
+ }
+ }
+ return pool.pop();
+ }
+
+ @Override
+ public Type getType() {
+ return Type.POOL;
+ }
+
+ @Override
+ public int getTotalAllocation() {
+ return totalAllocation;
+ }
+
+ @Override
+ public int getComponentId() {
+ return componentId;
+ }
+
+ @Override
+ public void expand(int delta) {
+ for (int i = 0; i < delta; i++) {
+ DataBucket bucket = new DataBucket(this);
+ pool.add(bucket);
+ }
+ totalAllocation += delta;
+ }
+
+ @Override
+ public void reset() {
+ totalAllocation -= pool.size();
+ pool.clear();
+ }
+
+ @Override
+ public String toString() {
+ return "DataBucketPool" + "[" + componentId + "]" + "(" + totalAllocation + ")";
+ }
+
+ public int getSize() {
+ return pool.size();
+ }
+
+ public int getFrameSize() {
+ return frameSize;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DistributeFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DistributeFeedFrameWriter.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DistributeFeedFrameWriter.java
new file mode 100644
index 0000000..1e1baca
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DistributeFeedFrameWriter.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedOperatorOutputSideHandler.Type;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * Provides mechanism for distributing the frames, as received from an operator to a
+ * set of registered readers. Each reader typically operates at a different pace. Readers
+ * are isolated from each other to ensure that a slow reader does not impact the progress of
+ * others.
+ **/
+public class DistributeFeedFrameWriter implements IFrameWriter {
+
+ private static final Logger LOGGER = Logger.getLogger(DistributeFeedFrameWriter.class.getName());
+
+ /** A unique identifier for the feed to which the incoming tuples belong. **/
+ private final FeedId feedId;
+
+ /** An instance of FrameDistributor that provides the mechanism for distributing a frame to multiple readers, each operating in isolation. **/
+ private final FrameDistributor frameDistributor;
+
+ /** The original frame writer instantiated as part of job creation **/
+ private IFrameWriter writer;
+
+ /** The feed operation whose output is being distributed by the DistributeFeedFrameWriter **/
+ private final FeedRuntimeType feedRuntimeType;
+
+ /** The value of the partition 'i' if this is the i'th instance of the associated operator **/
+ private final int partition;
+
+ public DistributeFeedFrameWriter(IHyracksTaskContext ctx, FeedId feedId, IFrameWriter writer, FeedRuntimeType feedRuntimeType,
+ int partition, FrameTupleAccessor fta, IFeedManager feedManager) throws IOException {
+ this.feedId = feedId;
+ this.frameDistributor = new FrameDistributor(ctx, feedId, feedRuntimeType, partition, true,
+ feedManager.getFeedMemoryManager(), fta);
+ this.feedRuntimeType = feedRuntimeType;
+ this.partition = partition;
+ this.writer = writer;
+ }
+
+ public FeedFrameCollector subscribeFeed(FeedPolicyAccessor fpa, IFrameWriter frameWriter,
+ FeedConnectionId connectionId) throws Exception {
+ FeedFrameCollector collector = null;
+ if (!frameDistributor.isRegistered(frameWriter)) {
+ collector = new FeedFrameCollector(frameDistributor, fpa, frameWriter, connectionId);
+ frameDistributor.registerFrameCollector(collector);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registered subscriber, new mode " + frameDistributor.getMode());
+ }
+ return collector;
+ } else {
+ throw new IllegalStateException("subscriber " + feedId + " already registered");
+ }
+ }
+
+ public void unsubscribeFeed(IFrameWriter recipientFeedFrameWriter) throws Exception {
+ boolean success = frameDistributor.deregisterFrameCollector(recipientFeedFrameWriter);
+ if (!success) {
+ throw new IllegalStateException("Invalid attempt to unregister FeedFrameWriter " + recipientFeedFrameWriter
+ + " not registered.");
+ }
+ }
+
+ public void notifyEndOfFeed() {
+ frameDistributor.notifyEndOfFeed();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ frameDistributor.close();
+ writer.close();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ frameDistributor.nextFrame(frame);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
+ return frameDistributor.getRegisteredReaders();
+ }
+
+ public void setWriter(IFrameWriter writer) {
+ this.writer = writer;
+ }
+
+ public Type getType() {
+ return IFeedOperatorOutputSideHandler.Type.DISTRIBUTE_FEED_OUTPUT_HANDLER;
+ }
+
+ @Override
+ public String toString() {
+ return feedId.toString() + feedRuntimeType + "[" + partition + "]";
+ }
+
+ public FrameDistributor.DistributionMode getDistributionMode() {
+ return frameDistributor.getDistributionMode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedActivity.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedActivity.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedActivity.java
new file mode 100644
index 0000000..dc19555
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedActivity.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.Map;
+
+public class FeedActivity implements Comparable<FeedActivity> {
+
+ private int activityId;
+
+ private final String dataverseName;
+ private final String datasetName;
+ private final String feedName;
+ private final Map<String, String> feedActivityDetails;
+
+ public static class FeedActivityDetails {
+ public static final String INTAKE_LOCATIONS = "intake-locations";
+ public static final String COMPUTE_LOCATIONS = "compute-locations";
+ public static final String STORAGE_LOCATIONS = "storage-locations";
+ public static final String COLLECT_LOCATIONS = "collect-locations";
+ public static final String FEED_POLICY_NAME = "feed-policy-name";
+ public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp";
+
+ }
+
+ public FeedActivity(String dataverseName, String feedName, String datasetName,
+ Map<String, String> feedActivityDetails) {
+ this.dataverseName = dataverseName;
+ this.feedName = feedName;
+ this.datasetName = datasetName;
+ this.feedActivityDetails = feedActivityDetails;
+ }
+
+ public String getDataverseName() {
+ return dataverseName;
+ }
+
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ public String getFeedName() {
+ return feedName;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof FeedActivity)) {
+ return false;
+ }
+
+ if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
+ return false;
+ }
+ if (!((FeedActivity) other).datasetName.equals(datasetName)) {
+ return false;
+ }
+ if (!((FeedActivity) other).getFeedName().equals(feedName)) {
+ return false;
+ }
+ if (((FeedActivity) other).getActivityId() != (activityId)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return dataverseName + "." + feedName + " --> " + datasetName + " " + activityId;
+ }
+
+ public String getConnectTimestamp() {
+ return feedActivityDetails.get(FeedActivityDetails.FEED_CONNECT_TIMESTAMP);
+ }
+
+ public int getActivityId() {
+ return activityId;
+ }
+
+ public void setActivityId(int activityId) {
+ this.activityId = activityId;
+ }
+
+ public Map<String, String> getFeedActivityDetails() {
+ return feedActivityDetails;
+ }
+
+ @Override
+ public int compareTo(FeedActivity o) {
+ return o.getActivityId() - this.activityId;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedCollectRuntimeInputHandler.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
new file mode 100644
index 0000000..161408a
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedCollectRuntimeInputHandler extends FeedRuntimeInputHandler {
+
+ private final FeedFrameCache feedFrameCache;
+
+ public FeedCollectRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled,
+ FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions)
+ throws IOException {
+ super(ctx, connectionId, runtimeId, coreOperator, fpa, bufferingEnabled, fta, recordDesc, feedManager,
+ nPartitions);
+ this.feedFrameCache = new FeedFrameCache(ctx, fta, coreOperator);
+ }
+
+ public void process(ByteBuffer frame) throws HyracksDataException {
+ feedFrameCache.sendMessage(frame);
+ super.process(frame);
+ }
+
+ public void replayFrom(int recordId) throws HyracksDataException {
+ feedFrameCache.replayRecords(recordId);
+ }
+
+ public void dropTill(int recordId) {
+ feedFrameCache.dropTillRecordId(recordId);
+ }
+
+ public void replayCached() throws HyracksDataException{
+ feedFrameCache.replayAll();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectJobInfo.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectJobInfo.java
new file mode 100644
index 0000000..1045e6e
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectJobInfo.java
@@ -0,0 +1,75 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedConnectJobInfo extends FeedJobInfo {
+
+ private final FeedConnectionId connectionId;
+ private final Map<String, String> feedPolicy;
+ private final IFeedJoint sourceFeedJoint;
+ private IFeedJoint computeFeedJoint;
+
+ private List<String> collectLocations;
+ private List<String> computeLocations;
+ private List<String> storageLocations;
+
+ public FeedConnectJobInfo(JobId jobId, FeedJobState state, FeedConnectionId connectionId,
+ IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
+ Map<String, String> feedPolicy) {
+ super(jobId, state, FeedJobInfo.JobType.FEED_CONNECT, spec);
+ this.connectionId = connectionId;
+ this.sourceFeedJoint = sourceFeedJoint;
+ this.computeFeedJoint = computeFeedJoint;
+ this.feedPolicy = feedPolicy;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public List<String> getCollectLocations() {
+ return collectLocations;
+ }
+
+ public List<String> getComputeLocations() {
+ return computeLocations;
+ }
+
+ public List<String> getStorageLocations() {
+ return storageLocations;
+ }
+
+ public void setCollectLocations(List<String> collectLocations) {
+ this.collectLocations = collectLocations;
+ }
+
+ public void setComputeLocations(List<String> computeLocations) {
+ this.computeLocations = computeLocations;
+ }
+
+ public void setStorageLocations(List<String> storageLocations) {
+ this.storageLocations = storageLocations;
+ }
+
+ public IFeedJoint getSourceFeedJoint() {
+ return sourceFeedJoint;
+ }
+
+ public IFeedJoint getComputeFeedJoint() {
+ return computeFeedJoint;
+ }
+
+ public Map<String, String> getFeedPolicy() {
+ return feedPolicy;
+ }
+
+ public void setComputeFeedJoint(IFeedJoint computeFeedJoint) {
+ this.computeFeedJoint = computeFeedJoint;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionId.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionId.java
index def7c10..e09a4fe 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionId.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionId.java
@@ -17,28 +17,27 @@ package edu.uci.ics.asterix.common.feeds;
import java.io.Serializable;
/**
- * A unique identifier for a data feed flowing into a dataset.
+ * A unique identifier for a feed connection. A feed connection is an instance of a data feed that is flowing into a dataset.
*/
public class FeedConnectionId implements Serializable {
private static final long serialVersionUID = 1L;
- private final String dataverse;
- private final String feedName;
+ private final FeedId feedId;
private final String datasetName;
- public FeedConnectionId(String dataverse, String feedName, String datasetName) {
- this.dataverse = dataverse;
- this.feedName = feedName;
+ public FeedConnectionId(FeedId feedId, String datasetName) {
+ this.feedId = feedId;
this.datasetName = datasetName;
}
- public String getDataverse() {
- return dataverse;
+ public FeedConnectionId(String dataverse, String feedName, String datasetName) {
+ this.feedId = new FeedId(dataverse, feedName);
+ this.datasetName = datasetName;
}
- public String getFeedName() {
- return feedName;
+ public FeedId getFeedId() {
+ return feedId;
}
public String getDatasetName() {
@@ -50,9 +49,10 @@ public class FeedConnectionId implements Serializable {
if (o == null || !(o instanceof FeedConnectionId)) {
return false;
}
- if (((FeedConnectionId) o).getFeedName().equals(feedName)
- && ((FeedConnectionId) o).getDataverse().equals(dataverse)
- && ((FeedConnectionId) o).getDatasetName().equals(datasetName)) {
+
+ if (this == o
+ || (((FeedConnectionId) o).getFeedId().equals(feedId) && ((FeedConnectionId) o).getDatasetName()
+ .equals(datasetName))) {
return true;
}
return false;
@@ -65,6 +65,6 @@ public class FeedConnectionId implements Serializable {
@Override
public String toString() {
- return dataverse + "." + feedName + "-->" + datasetName;
+ return feedId.toString() + "-->" + datasetName;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionRequest.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionRequest.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionRequest.java
new file mode 100644
index 0000000..2343b40
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionRequest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
+
+/**
+ * A request for connecting a feed to a dataset.
+ */
+public class FeedConnectionRequest {
+
+ public enum ConnectionStatus {
+ /** initial state upon creating a connection request **/
+ INITIALIZED,
+
+ /** connection establish; feed is receiving data **/
+ ACTIVE,
+
+ /** connection removed; feed is not receiving data **/
+ INACTIVE,
+
+ /** connection request failed **/
+ FAILED
+ }
+
+ /** Feed joint on the feed pipeline that serves as the source for this subscription **/
+ private final FeedJointKey feedJointKey;
+
+ /** Location in the source feed pipeline from where feed tuples are received. **/
+ private final ConnectionLocation connectionLocation;
+
+ /** List of functions that need to be applied in sequence after the data hand-off at the source feedPointKey. **/
+ private final List<String> functionsToApply;
+
+ /** Status associated with the subscription. */
+ private ConnectionStatus connectionStatus;
+
+ /** Name of the policy that governs feed ingestion **/
+ private final String policy;
+
+ /** Policy associated with a feed connection **/
+ private final Map<String, String> policyParameters;
+
+ /** Target dataset associated with the connection request **/
+ private final String targetDataset;
+
+ private final FeedId receivingFeedId;
+
+
+ public FeedConnectionRequest(FeedJointKey feedPointKey, ConnectionLocation connectionLocation,
+ List<String> functionsToApply, String targetDataset, String policy, Map<String, String> policyParameters,
+ FeedId receivingFeedId) {
+ this.feedJointKey = feedPointKey;
+ this.connectionLocation = connectionLocation;
+ this.functionsToApply = functionsToApply;
+ this.targetDataset = targetDataset;
+ this.policy = policy;
+ this.policyParameters = policyParameters;
+ this.receivingFeedId = receivingFeedId;
+ this.connectionStatus = ConnectionStatus.INITIALIZED;
+ }
+
+ public FeedJointKey getFeedJointKey() {
+ return feedJointKey;
+ }
+
+ public ConnectionStatus getConnectionStatus() {
+ return connectionStatus;
+ }
+
+ public void setSubscriptionStatus(ConnectionStatus connectionStatus) {
+ this.connectionStatus = connectionStatus;
+ }
+
+ public String getPolicy() {
+ return policy;
+ }
+
+ public String getTargetDataset() {
+ return targetDataset;
+ }
+
+ public ConnectionLocation getSubscriptionLocation() {
+ return connectionLocation;
+ }
+
+ public FeedId getReceivingFeedId() {
+ return receivingFeedId;
+ }
+
+ public Map<String, String> getPolicyParameters() {
+ return policyParameters;
+ }
+
+ public List<String> getFunctionsToApply() {
+ return functionsToApply;
+ }
+
+ @Override
+ public String toString() {
+ return "Feed Connection Request " + feedJointKey + " [" + connectionLocation + "]" + " Apply ("
+ + StringUtils.join(functionsToApply, ",") + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConstants.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConstants.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConstants.java
new file mode 100644
index 0000000..ae29908
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConstants.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.asterix.common.feeds;
+
+public class FeedConstants {
+
+ public static final class StatisticsConstants {
+ public static final String INTAKE_TUPLEID = "intake-tupleid";
+ public static final String INTAKE_PARTITION = "intake-partition";
+ public static final String INTAKE_TIMESTAMP = "intake-timestamp";
+ public static final String COMPUTE_TIMESTAMP = "compute-timestamp";
+ public static final String STORE_TIMESTAMP = "store-timestamp";
+
+ }
+
+ public static final class MessageConstants {
+ public static final String MESSAGE_TYPE = "message-type";
+ public static final String NODE_ID = "nodeId";
+ public static final String DATAVERSE = "dataverse";
+ public static final String FEED = "feed";
+ public static final String DATASET = "dataset";
+ public static final String AQL = "aql";
+ public static final String RUNTIME_TYPE = "runtime-type";
+ public static final String PARTITION = "partition";
+ public static final String INTAKE_PARTITION = "intake-partition";
+ public static final String INFLOW_RATE = "inflow-rate";
+ public static final String OUTFLOW_RATE = "outflow-rate";
+ public static final String MODE = "mode";
+ public static final String CURRENT_CARDINALITY = "current-cardinality";
+ public static final String REDUCED_CARDINALITY = "reduced-cardinality";
+ public static final String VALUE_TYPE = "value-type";
+ public static final String VALUE = "value";
+ public static final String CPU_LOAD = "cpu-load";
+ public static final String N_RUNTIMES = "n_runtimes";
+ public static final String HEAP_USAGE = "heap_usage";
+ public static final String OPERAND_ID = "operand-id";
+ public static final String COMPUTE_PARTITION_RETAIN_LIMIT = "compute-partition-retain-limit";
+ public static final String LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP = "last-persisted-tuple-intake_timestamp";
+ public static final String PERSISTENCE_DELAY_WITHIN_LIMIT = "persistence-delay-within-limit";
+ public static final String AVERAGE_PERSISTENCE_DELAY = "average-persistence-delay";
+ public static final String COMMIT_ACKS = "commit-acks";
+ public static final String MAX_WINDOW_ACKED = "max-window-acked";
+ public static final String BASE = "base";
+ public static final String NOT_APPLICABLE = "N/A";
+
+ }
+
+ public static final class NamingConstants {
+ public static final String LIBRARY_NAME_SEPARATOR = "#";
+ }
+
+ public static class JobConstants {
+ public static final int DEFAULT_FRAME_SIZE = 8192;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedExceptionHandler.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedExceptionHandler.java
new file mode 100644
index 0000000..cd9dca4
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedExceptionHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.exceptions.FrameDataException;
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class FeedExceptionHandler implements IExceptionHandler {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedExceptionHandler.class.getName());
+
+ private final IHyracksTaskContext ctx;
+ private final FrameTupleAccessor fta;
+ private final RecordDescriptor recordDesc;
+ private final IFeedManager feedManager;
+ private final FeedConnectionId connectionId;
+
+ public FeedExceptionHandler(IHyracksTaskContext ctx, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+ IFeedManager feedManager, FeedConnectionId connectionId) {
+ this.ctx = ctx;
+ this.fta = fta;
+ this.recordDesc = recordDesc;
+ this.feedManager = feedManager;
+ this.connectionId = connectionId;
+ }
+
+ public ByteBuffer handleException(Exception e, ByteBuffer frame) {
+ try {
+ if (e instanceof FrameDataException) {
+ fta.reset(frame);
+ FrameDataException fde = (FrameDataException) e;
+ int tupleIndex = fde.getTupleIndex();
+
+ // logging
+ try {
+ logExceptionCausingTuple(tupleIndex, e);
+ } catch (Exception ex) {
+ ex.addSuppressed(e);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to log exception causing tuple due to..." + ex.getMessage());
+ }
+ }
+ // slicing
+ return FeedFrameUtil.getSlicedFrame(ctx, tupleIndex, fta);
+ } else {
+ return null;
+ }
+ } catch (Exception exception) {
+ exception.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to handle exception " + exception.getMessage());
+ }
+ return null;
+ }
+ }
+
+ private void logExceptionCausingTuple(int tupleIndex, Exception e) throws HyracksDataException, AsterixException {
+
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream di = new DataInputStream(bbis);
+
+ int start = fta.getTupleStartOffset(tupleIndex) + fta.getFieldSlotsLength();
+ bbis.setByteBuffer(fta.getBuffer(), start);
+
+ Object[] record = new Object[recordDesc.getFieldCount()];
+
+ for (int i = 0; i < record.length; ++i) {
+ Object instance = recordDesc.getFields()[i].deserialize(di);
+ if (i == 0) {
+ String tuple = String.valueOf(instance);
+ feedManager.getFeedMetadataManager().logTuple(connectionId, tuple, e.getMessage(), feedManager);
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(", " + String.valueOf(instance));
+ }
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCache.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCache.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCache.java
new file mode 100644
index 0000000..310e7c0
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCache.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import edu.uci.ics.asterix.common.feeds.FeedConstants.StatisticsConstants;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+/**
+ * Allows caching of feed frames. This class is used in providing upstream backup.
+ * The tuples at the intake layer are held in this cache until these are acked by
+ * the storage layer post their persistence. On receiving an ack, appropriate tuples
+ * (recordsId < ackedRecordId) are dropped from the cache.
+ */
+public class FeedFrameCache extends MessageReceiver<ByteBuffer> {
+
+ /**
+ * Value represents a cache feed frame
+ * Key represents the largest record Id in the frame.
+ * At the intake side, the largest record id corresponds to the last record in the frame
+ **/
+ private final Map<Integer, ByteBuffer> orderedCache;
+ private final FrameTupleAccessor tupleAccessor;
+ private final IFrameWriter frameWriter;
+ private final IHyracksTaskContext ctx;
+
+ public FeedFrameCache(IHyracksTaskContext ctx, FrameTupleAccessor tupleAccessor, IFrameWriter frameWriter) {
+ this.tupleAccessor = tupleAccessor;
+ this.frameWriter = frameWriter;
+ /** A LinkedHashMap ensures entries are retrieved in order of their insertion **/
+ this.orderedCache = new LinkedHashMap<Integer, ByteBuffer>();
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void processMessage(ByteBuffer frame) throws Exception {
+ int lastRecordId = getLastRecordId(frame);
+ ByteBuffer clone = cloneFrame(frame);
+ orderedCache.put(lastRecordId, clone);
+ }
+
+ public void dropTillRecordId(int recordId) {
+ List<Integer> dropRecordIds = new ArrayList<Integer>();
+ for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
+ int recId = entry.getKey();
+ if (recId <= recordId) {
+ dropRecordIds.add(recId);
+ } else {
+ break;
+ }
+ }
+ for (Integer r : dropRecordIds) {
+ orderedCache.remove(r);
+ }
+ }
+
+ public void replayRecords(int startingRecordId) throws HyracksDataException {
+ boolean replayPositionReached = false;
+ for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
+ // the key increases monotonically
+ int maxRecordIdInFrame = entry.getKey();
+ if (!replayPositionReached) {
+ if (startingRecordId < maxRecordIdInFrame) {
+ replayFrame(startingRecordId, entry.getValue());
+ break;
+ } else {
+ continue;
+ }
+ }
+ }
+ }
+
+ /**
+ * Replay the frame from the tuple (inclusive) with recordId as specified.
+ *
+ * @param recordId
+ * @param frame
+ * @throws HyracksDataException
+ */
+ private void replayFrame(int recordId, ByteBuffer frame) throws HyracksDataException {
+ tupleAccessor.reset(frame);
+ int nTuples = tupleAccessor.getTupleCount();
+ for (int i = 0; i < nTuples; i++) {
+ int rid = getRecordIdAtTupleIndex(i, frame);
+ if (rid == recordId) {
+ ByteBuffer slicedFrame = splitFrame(i, frame);
+ replayFrame(slicedFrame);
+ break;
+ }
+ }
+ }
+
+ private ByteBuffer splitFrame(int beginTupleIndex, ByteBuffer frame) throws HyracksDataException {
+ IFrame slicedFrame = new VSizeFrame(ctx);
+ FrameTupleAppender appender = new FrameTupleAppender();
+ appender.reset(slicedFrame, true);
+ int totalTuples = tupleAccessor.getTupleCount();
+ for (int ti = beginTupleIndex; ti < totalTuples; ti++) {
+ appender.append(tupleAccessor, ti);
+ }
+ return slicedFrame.getBuffer();
+ }
+
+ /**
+ * Replay the frame
+ *
+ * @param frame
+ * @throws HyracksDataException
+ */
+ private void replayFrame(ByteBuffer frame) throws HyracksDataException {
+ frameWriter.nextFrame(frame);
+ }
+
+ private int getLastRecordId(ByteBuffer frame) {
+ tupleAccessor.reset(frame);
+ int nTuples = tupleAccessor.getTupleCount();
+ return getRecordIdAtTupleIndex(nTuples - 1, frame);
+ }
+
+ private int getRecordIdAtTupleIndex(int tupleIndex, ByteBuffer frame) {
+ tupleAccessor.reset(frame);
+ int recordStart = tupleAccessor.getTupleStartOffset(tupleIndex) + tupleAccessor.getFieldSlotsLength();
+ int openPartOffset = frame.getInt(recordStart + 6);
+ int numOpenFields = frame.getInt(recordStart + openPartOffset);
+ int recordIdOffset = frame.getInt(recordStart + openPartOffset + 4 + numOpenFields * 8
+ + StatisticsConstants.INTAKE_TUPLEID.length() + 2 + 1);
+ int lastRecordId = frame.getInt(recordStart + recordIdOffset);
+ return lastRecordId;
+ }
+
+ private ByteBuffer cloneFrame(ByteBuffer frame) {
+ ByteBuffer clone = ByteBuffer.allocate(frame.capacity());
+ System.arraycopy(frame.array(), 0, clone.array(), 0, frame.limit());
+ return clone;
+ }
+
+ public void replayAll() throws HyracksDataException {
+ for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
+ ByteBuffer frame = entry.getValue();
+ frameWriter.nextFrame(frame);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCollector.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCollector.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCollector.java
new file mode 100644
index 0000000..e8e424c
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCollector.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * Copyright 2009-2013 by The Regents of the University of California
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.IMessageReceiver;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedFrameCollector extends MessageReceiver<DataBucket> implements IMessageReceiver<DataBucket> {
+
+ private final FeedConnectionId connectionId;
+ private final FrameDistributor frameDistributor;
+ private FeedPolicyAccessor fpa;
+ private IFrameWriter frameWriter;
+ private State state;
+
+ public enum State {
+ ACTIVE,
+ FINISHED,
+ TRANSITION,
+ HANDOVER
+ }
+
+ public FeedFrameCollector(FrameDistributor frameDistributor, FeedPolicyAccessor feedPolicyAccessor,
+ IFrameWriter frameWriter, FeedConnectionId connectionId) {
+ super();
+ this.frameDistributor = frameDistributor;
+ this.fpa = feedPolicyAccessor;
+ this.connectionId = connectionId;
+ this.frameWriter = frameWriter;
+ this.state = State.ACTIVE;
+ }
+
+ @Override
+ public void processMessage(DataBucket bucket) throws Exception {
+ try {
+ ByteBuffer frame = bucket.getContent();
+ switch (bucket.getContentType()) {
+ case DATA:
+ frameWriter.nextFrame(frame);
+ break;
+ case EOD:
+ closeCollector();
+ break;
+ case EOSD:
+ throw new AsterixException("Received data bucket with content of type " + bucket.getContentType());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to process data bucket " + bucket + ", encountered exception " + e.getMessage());
+ }
+ } finally {
+ bucket.doneReading();
+ }
+ }
+
+ public void closeCollector() {
+ if (state.equals(State.TRANSITION)) {
+ super.close(true);
+ setState(State.ACTIVE);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(this + " is now " + State.ACTIVE + " mode, processing frames synchronously");
+ }
+ } else {
+ flushPendingMessages();
+ setState(State.FINISHED);
+ synchronized (frameDistributor.getRegisteredCollectors()) {
+ frameDistributor.getRegisteredCollectors().notifyAll();
+ }
+ disconnect();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Closed collector " + this);
+ }
+ }
+
+ public synchronized void disconnect() {
+ setState(State.FINISHED);
+ }
+
+ public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ frameWriter.nextFrame(frame);
+ }
+
+ public FeedPolicyAccessor getFeedPolicyAccessor() {
+ return fpa;
+ }
+
+ public synchronized State getState() {
+ return state;
+ }
+
+ public synchronized void setState(State state) {
+ this.state = state;
+ switch (state) {
+ case FINISHED:
+ case HANDOVER:
+ notifyAll();
+ break;
+ default:
+ break;
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Frame Collector " + this.frameDistributor.getFeedRuntimeType() + " switched to " + state);
+ }
+ }
+
+ public IFrameWriter getFrameWriter() {
+ return frameWriter;
+ }
+
+ public void setFrameWriter(IFrameWriter frameWriter) {
+ this.frameWriter = frameWriter;
+ }
+
+ @Override
+ public String toString() {
+ return "FrameCollector " + connectionId + "," + state + "]";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o instanceof FeedFrameCollector) {
+ return connectionId.equals(((FeedFrameCollector) o).connectionId);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return connectionId.toString().hashCode();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameDiscarder.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameDiscarder.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameDiscarder.java
new file mode 100644
index 0000000..437ed95
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameDiscarder.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class FeedFrameDiscarder {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
+
+ private final IHyracksTaskContext ctx;
+ private final FeedRuntimeInputHandler inputHandler;
+ private final FeedConnectionId connectionId;
+ private final FeedRuntimeId runtimeId;
+ private final FeedPolicyAccessor policyAccessor;
+ private final float maxFractionDiscard;
+ private int nDiscarded;
+
+ public FeedFrameDiscarder(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ FeedPolicyAccessor policyAccessor, FeedRuntimeInputHandler inputHandler) throws IOException {
+ this.ctx = ctx;
+ this.connectionId = connectionId;
+ this.runtimeId = runtimeId;
+ this.policyAccessor = policyAccessor;
+ this.inputHandler = inputHandler;
+ this.maxFractionDiscard = policyAccessor.getMaxFractionDiscard();
+ }
+
+ public boolean processMessage(ByteBuffer message) {
+ if (policyAccessor.getMaxFractionDiscard() != 0) {
+ long nProcessed = inputHandler.getProcessed();
+ long discardLimit = (long) (nProcessed * maxFractionDiscard);
+ if (nDiscarded >= discardLimit) {
+ return false;
+ }
+ nDiscarded++;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Discarded frame by " + connectionId + " (" + runtimeId + ")" + " count so far ("
+ + nDiscarded + ") Limit [" + discardLimit + "]");
+ }
+ return true;
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameHandlers.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameHandlers.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameHandlers.java
new file mode 100644
index 0000000..5c41fd4
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameHandlers.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedFrameHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedFrameHandlers {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedFrameHandlers.class.getName());
+
+ public enum RoutingMode {
+ IN_MEMORY_ROUTE,
+ SPILL_TO_DISK,
+ DISCARD
+ }
+
+ public static IFeedFrameHandler getFeedFrameHandler(FrameDistributor distributor, FeedId feedId,
+ RoutingMode routingMode, FeedRuntimeType runtimeType, int partition, int frameSize) throws IOException {
+ IFeedFrameHandler handler = null;
+ switch (routingMode) {
+ case IN_MEMORY_ROUTE:
+ handler = new InMemoryRouter(distributor.getRegisteredReaders().values(), runtimeType, partition);
+ break;
+ case SPILL_TO_DISK:
+ handler = new DiskSpiller(distributor, feedId, runtimeType, partition, frameSize);
+ break;
+ case DISCARD:
+ handler = new DiscardRouter(distributor, feedId, runtimeType, partition);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid routing mode" + routingMode);
+ }
+ return handler;
+ }
+
+ public static class DiscardRouter implements IFeedFrameHandler {
+
+ private final FeedId feedId;
+ private int nDiscarded;
+ private final FeedRuntimeType runtimeType;
+ private final int partition;
+ private final FrameDistributor distributor;
+
+ public DiscardRouter(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition)
+ throws HyracksDataException {
+ this.distributor = distributor;
+ this.feedId = feedId;
+ this.nDiscarded = 0;
+ this.runtimeType = runtimeType;
+ this.partition = partition;
+ }
+
+ @Override
+ public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+ FrameTupleAccessor fta = distributor.getFta();
+ fta.reset(frame);
+ int nTuples = fta.getTupleCount();
+ nDiscarded += nTuples;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Discarded additional [" + runtimeType + "]" + "(" + partition + ")" + " " + nTuples);
+ }
+ }
+
+ @Override
+ public void handleDataBucket(DataBucket bucket) {
+ nDiscarded++;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Discard Count" + nDiscarded);
+ }
+ }
+
+ @Override
+ public void close() {
+ // do nothing, no resource to relinquish
+ }
+
+ @Override
+ public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+ throw new IllegalStateException("Invalid operation");
+ }
+
+ @Override
+ public String toString() {
+ return "DiscardRouter" + "[" + feedId + "]" + "(" + nDiscarded + ")";
+ }
+
+ @Override
+ public String getSummary() {
+ return new String("Number of discarded frames (since last reset)" + " feedId " + "[" + feedId + "]" + "("
+ + nDiscarded + ")");
+ }
+
+ }
+
+ public static class InMemoryRouter implements IFeedFrameHandler {
+
+ private final Collection<FeedFrameCollector> frameCollectors;
+
+ public InMemoryRouter(Collection<FeedFrameCollector> frameCollectors, FeedRuntimeType runtimeType, int partition) {
+ this.frameCollectors = frameCollectors;
+ }
+
+ @Override
+ public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+ throw new IllegalStateException("Operation not supported");
+ }
+
+ @Override
+ public void handleDataBucket(DataBucket bucket) {
+ for (FeedFrameCollector collector : frameCollectors) {
+ collector.sendMessage(bucket); // asynchronous call
+ }
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ @Override
+ public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+ throw new IllegalStateException("Operation not supported");
+ }
+
+ @Override
+ public String getSummary() {
+ return "InMemoryRouter Summary";
+ }
+ }
+
+ public static class DiskSpiller implements IFeedFrameHandler {
+
+ private final FeedId feedId;
+ private FrameSpiller<ByteBuffer> receiver;
+ private Iterator<ByteBuffer> iterator;
+
+ public DiskSpiller(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition,
+ int frameSize) throws IOException {
+ this.feedId = feedId;
+ receiver = new FrameSpiller<ByteBuffer>(distributor, feedId, frameSize);
+ }
+
+ @Override
+ public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+ receiver.sendMessage(frame);
+ }
+
+ private static class FrameSpiller<T> extends MessageReceiver<ByteBuffer> {
+
+ private final int frameSize;
+ private final FeedId feedId;
+ private BufferedOutputStream bos;
+ private final ByteBuffer reusableLengthBuffer;
+ private final ByteBuffer reusableDataBuffer;
+ private long offset;
+ private File file;
+ private final FrameDistributor frameDistributor;
+ private boolean fileCreated = false;
+
+ public FrameSpiller(FrameDistributor distributor, FeedId feedId, int frameSize) throws IOException {
+ this.feedId = feedId;
+ this.frameSize = frameSize;
+ this.frameDistributor = distributor;
+ reusableLengthBuffer = ByteBuffer.allocate(4);
+ reusableDataBuffer = ByteBuffer.allocate(frameSize);
+ this.offset = 0;
+ }
+
+ @Override
+ public void processMessage(ByteBuffer message) throws Exception {
+ if (!fileCreated) {
+ createFile();
+ fileCreated = true;
+ }
+ reusableLengthBuffer.flip();
+ reusableLengthBuffer.putInt(message.array().length);
+ bos.write(reusableLengthBuffer.array());
+ bos.write(message.array());
+ }
+
+ private void createFile() throws IOException {
+ Date date = new Date();
+ String dateSuffix = date.toString().replace(' ', '_');
+ String fileName = feedId.toString() + "_" + frameDistributor.getFeedRuntimeType() + "_"
+ + frameDistributor.getPartition() + "_" + dateSuffix;
+
+ file = new File(fileName);
+ if (!file.exists()) {
+ boolean success = file.createNewFile();
+ if (!success) {
+ throw new IOException("Unable to create spill file for feed " + feedId);
+ }
+ }
+ bos = new BufferedOutputStream(new FileOutputStream(file));
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Created Spill File for feed " + feedId);
+ }
+ }
+
+ @SuppressWarnings("resource")
+ public Iterator<ByteBuffer> replayData() throws Exception {
+ final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
+ bis.skip(offset);
+ return new Iterator<ByteBuffer>() {
+
+ @Override
+ public boolean hasNext() {
+ boolean more = false;
+ try {
+ more = bis.available() > 0;
+ if (!more) {
+ bis.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return more;
+ }
+
+ @Override
+ public ByteBuffer next() {
+ reusableLengthBuffer.flip();
+ try {
+ bis.read(reusableLengthBuffer.array());
+ reusableLengthBuffer.flip();
+ int frameSize = reusableLengthBuffer.getInt();
+ reusableDataBuffer.flip();
+ bis.read(reusableDataBuffer.array(), 0, frameSize);
+ offset += 4 + frameSize;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return reusableDataBuffer;
+ }
+
+ @Override
+ public void remove() {
+ }
+
+ };
+ }
+
+ }
+
+ @Override
+ public void handleDataBucket(DataBucket bucket) {
+ throw new IllegalStateException("Operation not supported");
+ }
+
+ @Override
+ public void close() {
+ receiver.close(true);
+ }
+
+ @Override
+ public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+ try {
+ iterator = receiver.replayData();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ return iterator;
+ }
+
+ //TODO: Form a summary that includes stats related to what has been spilled to disk
+ @Override
+ public String getSummary() {
+ return "Disk Spiller Summary";
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameSpiller.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameSpiller.java
new file mode 100644
index 0000000..db8145f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameSpiller.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class FeedFrameSpiller {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
+
+ private final IHyracksTaskContext ctx;
+ private final FeedConnectionId connectionId;
+ private final FeedRuntimeId runtimeId;
+ private final FeedPolicyAccessor policyAccessor;
+ private BufferedOutputStream bos;
+ private File file;
+ private boolean fileCreated = false;
+ private long bytesWritten = 0;
+ private int spilledFrameCount = 0;
+
+ public FeedFrameSpiller(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+ FeedPolicyAccessor policyAccessor) throws IOException {
+ this.ctx = ctx;
+ this.connectionId = connectionId;
+ this.runtimeId = runtimeId;
+ this.policyAccessor = policyAccessor;
+ }
+
+ public boolean processMessage(ByteBuffer message) throws Exception {
+ if (!fileCreated) {
+ createFile();
+ fileCreated = true;
+ }
+ long maxAllowed = policyAccessor.getMaxSpillOnDisk();
+ if (maxAllowed != FeedPolicyAccessor.NO_LIMIT && bytesWritten + message.array().length > maxAllowed) {
+ return false;
+ } else {
+ bos.write(message.array());
+ bytesWritten += message.array().length;
+ spilledFrameCount++;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Spilled frame by " + runtimeId + " spill count " + spilledFrameCount);
+ }
+ return true;
+ }
+ }
+
+ private void createFile() throws IOException {
+ Date date = new Date();
+ String dateSuffix = date.toString().replace(' ', '_');
+ String fileName = connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
+ + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition() + "_" + dateSuffix;
+
+ file = new File(fileName);
+ if (!file.exists()) {
+ boolean success = file.createNewFile();
+ if (!success) {
+ throw new IOException("Unable to create spill file " + fileName + " for feed " + runtimeId);
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Created spill file " + file.getAbsolutePath());
+ }
+ }
+ }
+ bos = new BufferedOutputStream(new FileOutputStream(file));
+
+ }
+
+ public Iterator<ByteBuffer> replayData() throws Exception {
+ bos.flush();
+ return new FrameIterator(ctx, file.getName());
+ }
+
+ private static class FrameIterator implements Iterator<ByteBuffer> {
+
+ private final BufferedInputStream bis;
+ private final IHyracksTaskContext ctx;
+ private int readFrameCount = 0;
+
+ public FrameIterator(IHyracksTaskContext ctx, String filename) throws FileNotFoundException {
+ bis = new BufferedInputStream(new FileInputStream(new File(filename)));
+ this.ctx = ctx;
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean more = false;
+ try {
+ more = bis.available() > 0;
+ if (!more) {
+ bis.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return more;
+ }
+
+ @Override
+ public ByteBuffer next() {
+ IFrame frame = null;
+ try {
+ frame = new VSizeFrame(ctx);
+ Arrays.fill(frame.getBuffer().array(), (byte) 0);
+ bis.read(frame.getBuffer().array(), 0, frame.getFrameSize());
+ readFrameCount++;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Read spill frome " + readFrameCount);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return frame.getBuffer();
+ }
+
+ @Override
+ public void remove() {
+ }
+
+ }
+
+ public void reset() {
+ bytesWritten = 0;
+ // file.delete();
+ fileCreated = false;
+ bos = null;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resetted the FrameSpiller!");
+ }
+ }
+
+ public void close() {
+ if (bos != null) {
+ try {
+ bos.flush();
+ bos.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
\ No newline at end of file