You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:13 UTC

[16/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixFeedProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixFeedProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixFeedProperties.java
new file mode 100644
index 0000000..aa114e6
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixFeedProperties.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.config;
+
+public class AsterixFeedProperties extends AbstractAsterixProperties {
+
+    private static final String FEED_CENTRAL_MANAGER_PORT_KEY = "feed.central.manager.port";
+    private static final int FEED_CENTRAL_MANAGER_PORT_DEFAULT = 4500; // port at which the Central Feed Manager listens for control messages from local Feed Managers
+
+    private static final String FEED_MEMORY_GLOBALBUDGET_KEY = "feed.memory.global.budget";
+    private static final long FEED_MEMORY_GLOBALBUDGET_DEFAULT = 67108864; // 64MB or 2048 frames (assuming 32768 as frame size)
+
+    private static final String FEED_MEMORY_AVAILABLE_WAIT_TIMEOUT_KEY = "feed.memory.available.wait.timeout";
+    private static final long FEED_MEMORY_AVAILABLE_WAIT_TIMEOUT_DEFAULT = 10; // 10 seconds
+
+    private static final String FEED_PENDING_WORK_THRESHOLD_KEY = "feed.pending.work.threshold";
+    private static final int FEED_PENDING_WORK_THRESHOLD_DEFAULT = 50; // maximum length of input queue before triggering corrective action
+
+    private static final String FEED_MAX_SUCCESSIVE_THRESHOLD_PERIOD_KEY = "feed.max.threshold.period";
+    private static final int FEED_MAX_SUCCESSIVE_THRESHOLD_PERIOD_DEFAULT = 5;
+
+    public AsterixFeedProperties(AsterixPropertiesAccessor accessor) {
+        super(accessor);
+    }
+
+    public long getMemoryComponentGlobalBudget() {
+        return accessor.getProperty(FEED_MEMORY_GLOBALBUDGET_KEY, FEED_MEMORY_GLOBALBUDGET_DEFAULT,
+                PropertyInterpreters.getLongPropertyInterpreter());
+    }
+
+    public long getMemoryAvailableWaitTimeout() {
+        return accessor.getProperty(FEED_MEMORY_AVAILABLE_WAIT_TIMEOUT_KEY, FEED_MEMORY_AVAILABLE_WAIT_TIMEOUT_DEFAULT,
+                PropertyInterpreters.getLongPropertyInterpreter());
+    }
+
+    public int getFeedCentralManagerPort() {
+        return accessor.getProperty(FEED_CENTRAL_MANAGER_PORT_KEY, FEED_CENTRAL_MANAGER_PORT_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
+    public int getPendingWorkThreshold() {
+        return accessor.getProperty(FEED_PENDING_WORK_THRESHOLD_KEY, FEED_PENDING_WORK_THRESHOLD_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
+    public int getMaxSuccessiveThresholdPeriod() {
+        return accessor.getProperty(FEED_MAX_SUCCESSIVE_THRESHOLD_PERIOD_KEY,
+                FEED_MAX_SUCCESSIVE_THRESHOLD_PERIOD_DEFAULT, PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/IAsterixPropertiesProvider.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/IAsterixPropertiesProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/IAsterixPropertiesProvider.java
index e696519..32386f6 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/IAsterixPropertiesProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/IAsterixPropertiesProvider.java
@@ -24,4 +24,6 @@ public interface IAsterixPropertiesProvider {
     public AsterixMetadataProperties getMetadataProperties();
 
     public AsterixExternalProperties getExternalProperties();
+    
+    public AsterixFeedProperties getFeedProperties();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/exceptions/FrameDataException.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/exceptions/FrameDataException.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/exceptions/FrameDataException.java
new file mode 100644
index 0000000..4651607
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/exceptions/FrameDataException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.exceptions;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameDataException extends HyracksDataException {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int tupleIndex;
+
+    public FrameDataException(int tupleIndex, Exception cause) {
+        super(cause);
+        this.tupleIndex = tupleIndex;
+    }
+
+    public int getTupleIndex() {
+        return tupleIndex;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/BasicMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/BasicMonitoredBuffer.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/BasicMonitoredBuffer.java
new file mode 100644
index 0000000..ba38f87
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/BasicMonitoredBuffer.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class BasicMonitoredBuffer extends MonitoredBuffer {
+
+    public BasicMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter, FrameTupleAccessor fta,
+            RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+        super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+                exceptionHandler, callback, nPartitions, policyAccessor);
+    }
+
+    @Override
+    protected boolean monitorProcessingRate() {
+        return false;
+    }
+
+    @Override
+    protected boolean logInflowOutflowRate() {
+        return false;
+    }
+
+    @Override
+    protected IFramePreprocessor getFramePreProcessor() {
+        return null;
+    }
+
+    @Override
+    protected IFramePostProcessor getFramePostProcessor() {
+        return null;
+    }
+
+    @Override
+    protected boolean monitorInputQueueLength() {
+        return false;
+    }
+
+    @Override
+    protected boolean reportInflowRate() {
+        return false;
+    }
+
+    @Override
+    protected boolean reportOutflowRate() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/CollectionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/CollectionRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/CollectionRuntime.java
new file mode 100644
index 0000000..e324617
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/CollectionRuntime.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.FeedFrameCollector.State;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.asterix.common.feeds.api.ISubscriberRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * Represents the feed runtime that collects feed tuples from another feed.
+ * In case of a primary feed, the CollectionRuntime collects tuples from the feed
+ * intake job. For a secondary feed, tuples are collected from the intake/compute
+ * runtime associated with the source feed.
+ */
+public class CollectionRuntime extends FeedRuntime implements ISubscriberRuntime {
+
+    private final FeedConnectionId connectionId;
+    private final ISubscribableRuntime sourceRuntime;
+    private final Map<String, String> feedPolicy;
+    private FeedFrameCollector frameCollector;
+
+    public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            FeedRuntimeInputHandler inputSideHandler, IFrameWriter outputSideWriter,
+            ISubscribableRuntime sourceRuntime, Map<String, String> feedPolicy) {
+        super(runtimeId, inputSideHandler, outputSideWriter);
+        this.connectionId = connectionId;
+        this.sourceRuntime = sourceRuntime;
+        this.feedPolicy = feedPolicy;
+    }
+
+    public State waitTillCollectionOver() throws InterruptedException {
+        if (!(isCollectionOver())) {
+            synchronized (frameCollector) {
+                while (!isCollectionOver()) {
+                    frameCollector.wait();
+                }
+            }
+        }
+        return frameCollector.getState();
+    }
+
+    private boolean isCollectionOver() {
+        return frameCollector.getState().equals(FeedFrameCollector.State.FINISHED)
+                || frameCollector.getState().equals(FeedFrameCollector.State.HANDOVER);
+    }
+
+    public void setMode(Mode mode) {
+        getInputHandler().setMode(mode);
+    }
+
+    @Override
+    public Map<String, String> getFeedPolicy() {
+        return feedPolicy;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public ISubscribableRuntime getSourceRuntime() {
+        return sourceRuntime;
+    }
+
+    public void setFrameCollector(FeedFrameCollector frameCollector) {
+        this.frameCollector = frameCollector;
+    }
+
+    public FeedFrameCollector getFrameCollector() {
+        return frameCollector;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/ComputeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/ComputeSideMonitoredBuffer.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/ComputeSideMonitoredBuffer.java
new file mode 100644
index 0000000..161f27a
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/ComputeSideMonitoredBuffer.java
@@ -0,0 +1,55 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFrameEventCallback;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class ComputeSideMonitoredBuffer extends MonitoredBuffer {
+
+    public ComputeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
+            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
+            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
+            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
+        super(ctx, inputHandler, frameWriter, fta, recordDesc, metricCollector, connectionId, runtimeId,
+                exceptionHandler, callback, nPartitions, policyAccessor);
+    }
+
+    @Override
+    protected boolean monitorProcessingRate() {
+        return true;
+    }
+
+    protected boolean logInflowOutflowRate() {
+        return true;
+    }
+
+    @Override
+    protected boolean monitorInputQueueLength() {
+        return true;
+    }
+
+    @Override
+    protected IFramePreprocessor getFramePreProcessor() {
+        return null;
+    }
+
+    @Override
+    protected IFramePostProcessor getFramePostProcessor() {
+        return null;
+    }
+
+    @Override
+    protected boolean reportOutflowRate() {
+        return false;
+    }
+
+    @Override
+    protected boolean reportInflowRate() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucket.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucket.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucket.java
new file mode 100644
index 0000000..ea4480e
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucket.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataBucket {
+
+    private static final AtomicInteger globalBucketId = new AtomicInteger(0);
+
+    private final ByteBuffer content;
+    private final AtomicInteger readCount;
+    private final int bucketId;
+
+    private int desiredReadCount;
+    private ContentType contentType;
+
+    private final DataBucketPool pool;
+
+    public enum ContentType {
+        DATA, // data (feed tuple)
+        EOD, // A signal indicating that there shall be no more data
+        EOSD // End of processing of spilled data
+    }
+
+    public DataBucket(DataBucketPool pool) {
+        this.content = ByteBuffer.allocate(pool.getFrameSize());
+        this.readCount = new AtomicInteger(0);
+        this.pool = pool;
+        this.contentType = ContentType.DATA;
+        this.bucketId = globalBucketId.incrementAndGet();
+    }
+
+    public synchronized void reset(ByteBuffer frame) {
+        if (frame != null) {
+            content.flip();
+            System.arraycopy(frame.array(), 0, content.array(), 0, frame.limit());
+            content.limit(frame.limit());
+            content.position(0);
+        }
+    }
+
+    public synchronized void doneReading() {
+        if (readCount.incrementAndGet() == desiredReadCount) {
+            readCount.set(0);
+            pool.returnDataBucket(this);
+        }
+    }
+
+    public void setDesiredReadCount(int rCount) {
+        this.desiredReadCount = rCount;
+    }
+
+    public ContentType getContentType() {
+        return contentType;
+    }
+
+    public void setContentType(ContentType contentType) {
+        this.contentType = contentType;
+    }
+
+    public synchronized ByteBuffer getContent() {
+        return content;
+    }
+
+    @Override
+    public String toString() {
+        return "DataBucket [" + bucketId + "]" + " (" + readCount + "," + desiredReadCount + ")";
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucketPool.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucketPool.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucketPool.java
new file mode 100644
index 0000000..1a66b38
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DataBucketPool.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.Stack;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+
+/**
+ * Represents a pool of reusable {@link DataBucket}
+ */
+public class DataBucketPool implements IFeedMemoryComponent {
+
+    /** A unique identifier for the memory component **/
+    private final int componentId;
+
+    /** The {@link IFeedMemoryManager} for the NodeController **/
+    private final IFeedMemoryManager memoryManager;
+
+    /** A collection of available data buckets {@link DataBucket} **/
+    private final Stack<DataBucket> pool;
+
+    /** The total number of data buckets {@link DataBucket} allocated **/
+    private int totalAllocation;
+
+    /** The fixed frame size as configured for the asterix runtime **/
+    private final int frameSize;
+
+    public DataBucketPool(int componentId, IFeedMemoryManager memoryManager, int size, int frameSize) {
+        this.componentId = componentId;
+        this.memoryManager = memoryManager;
+        this.pool = new Stack<DataBucket>();
+        this.frameSize = frameSize;
+        expand(size);
+    }
+
+    public synchronized void returnDataBucket(DataBucket bucket) {
+        pool.push(bucket);
+    }
+
+    public synchronized DataBucket getDataBucket() {
+        if (pool.size() == 0) {
+            if (!memoryManager.expandMemoryComponent(this)) {
+                return null;
+            }
+        }
+        return pool.pop();
+    }
+
+    @Override
+    public Type getType() {
+        return Type.POOL;
+    }
+
+    @Override
+    public int getTotalAllocation() {
+        return totalAllocation;
+    }
+
+    @Override
+    public int getComponentId() {
+        return componentId;
+    }
+
+    @Override
+    public void expand(int delta) {
+        for (int i = 0; i < delta; i++) {
+            DataBucket bucket = new DataBucket(this);
+            pool.add(bucket);
+        }
+        totalAllocation += delta;
+    }
+
+    @Override
+    public void reset() {
+        totalAllocation -= pool.size();
+        pool.clear();
+    }
+
+    @Override
+    public String toString() {
+        return "DataBucketPool" + "[" + componentId + "]" + "(" + totalAllocation + ")";
+    }
+
+    public int getSize() {
+        return pool.size();
+    }
+
+    public int getFrameSize() {
+        return frameSize;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DistributeFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DistributeFeedFrameWriter.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DistributeFeedFrameWriter.java
new file mode 100644
index 0000000..1e1baca
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/DistributeFeedFrameWriter.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedOperatorOutputSideHandler.Type;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * Provides mechanism for distributing the frames, as received from an operator to a
+ * set of registered readers. Each reader typically operates at a different pace. Readers
+ * are isolated from each other to ensure that a slow reader does not impact the progress of
+ * others.
+ **/
+public class DistributeFeedFrameWriter implements IFrameWriter {
+
+    private static final Logger LOGGER = Logger.getLogger(DistributeFeedFrameWriter.class.getName());
+
+    /** A unique identifier for the feed to which the incoming tuples belong. **/
+    private final FeedId feedId;
+
+    /** An instance of FrameDistributor that provides the mechanism for distributing a frame to multiple readers, each operating in isolation. **/
+    private final FrameDistributor frameDistributor;
+
+    /** The original frame writer instantiated as part of job creation **/
+    private IFrameWriter writer;
+
+    /** The feed operation whose output is being distributed by the DistributeFeedFrameWriter **/
+    private final FeedRuntimeType feedRuntimeType;
+
+    /** The value of the partition 'i' if this is the i'th instance of the associated operator **/
+    private final int partition;
+
+    public DistributeFeedFrameWriter(IHyracksTaskContext ctx, FeedId feedId, IFrameWriter writer, FeedRuntimeType feedRuntimeType,
+            int partition, FrameTupleAccessor fta, IFeedManager feedManager) throws IOException {
+        this.feedId = feedId;
+        this.frameDistributor = new FrameDistributor(ctx, feedId, feedRuntimeType, partition, true,
+                feedManager.getFeedMemoryManager(), fta);
+        this.feedRuntimeType = feedRuntimeType;
+        this.partition = partition;
+        this.writer = writer;
+    }
+
+    public FeedFrameCollector subscribeFeed(FeedPolicyAccessor fpa, IFrameWriter frameWriter,
+            FeedConnectionId connectionId) throws Exception {
+        FeedFrameCollector collector = null;
+        if (!frameDistributor.isRegistered(frameWriter)) {
+            collector = new FeedFrameCollector(frameDistributor, fpa, frameWriter, connectionId);
+            frameDistributor.registerFrameCollector(collector);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Registered subscriber, new mode " + frameDistributor.getMode());
+            }
+            return collector;
+        } else {
+            throw new IllegalStateException("subscriber " + feedId + " already registered");
+        }
+    }
+
+    public void unsubscribeFeed(IFrameWriter recipientFeedFrameWriter) throws Exception {
+        boolean success = frameDistributor.deregisterFrameCollector(recipientFeedFrameWriter);
+        if (!success) {
+            throw new IllegalStateException("Invalid attempt to unregister FeedFrameWriter " + recipientFeedFrameWriter
+                    + " not registered.");
+        }
+    }
+
+    public void notifyEndOfFeed() {
+        frameDistributor.notifyEndOfFeed();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        frameDistributor.close();
+        writer.close();
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+        frameDistributor.nextFrame(frame);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+    }
+
+    public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
+        return frameDistributor.getRegisteredReaders();
+    }
+
+    public void setWriter(IFrameWriter writer) {
+        this.writer = writer;
+    }
+
+    public Type getType() {
+        return IFeedOperatorOutputSideHandler.Type.DISTRIBUTE_FEED_OUTPUT_HANDLER;
+    }
+
+    @Override
+    public String toString() {
+        return feedId.toString() + feedRuntimeType + "[" + partition + "]";
+    }
+
+    public FrameDistributor.DistributionMode getDistributionMode() {
+        return frameDistributor.getDistributionMode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedActivity.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedActivity.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedActivity.java
new file mode 100644
index 0000000..dc19555
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedActivity.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.Map;
+
+public class FeedActivity implements Comparable<FeedActivity> {
+
+    private int activityId;
+
+    private final String dataverseName;
+    private final String datasetName;
+    private final String feedName;
+    private final Map<String, String> feedActivityDetails;
+
+    public static class FeedActivityDetails {
+        public static final String INTAKE_LOCATIONS = "intake-locations";
+        public static final String COMPUTE_LOCATIONS = "compute-locations";
+        public static final String STORAGE_LOCATIONS = "storage-locations";
+        public static final String COLLECT_LOCATIONS = "collect-locations";
+        public static final String FEED_POLICY_NAME = "feed-policy-name";
+        public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp";
+
+    }
+
+    public FeedActivity(String dataverseName, String feedName, String datasetName,
+            Map<String, String> feedActivityDetails) {
+        this.dataverseName = dataverseName;
+        this.feedName = feedName;
+        this.datasetName = datasetName;
+        this.feedActivityDetails = feedActivityDetails;
+    }
+
+    public String getDataverseName() {
+        return dataverseName;
+    }
+
+    public String getDatasetName() {
+        return datasetName;
+    }
+
+    public String getFeedName() {
+        return feedName;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof FeedActivity)) {
+            return false;
+        }
+
+        if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
+            return false;
+        }
+        if (!((FeedActivity) other).datasetName.equals(datasetName)) {
+            return false;
+        }
+        if (!((FeedActivity) other).getFeedName().equals(feedName)) {
+            return false;
+        }
+        if (((FeedActivity) other).getActivityId() != (activityId)) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return dataverseName + "." + feedName + " --> " + datasetName + " " + activityId;
+    }
+
+    public String getConnectTimestamp() {
+        return feedActivityDetails.get(FeedActivityDetails.FEED_CONNECT_TIMESTAMP);
+    }
+
+    public int getActivityId() {
+        return activityId;
+    }
+
+    public void setActivityId(int activityId) {
+        this.activityId = activityId;
+    }
+
+    public Map<String, String> getFeedActivityDetails() {
+        return feedActivityDetails;
+    }
+
+    @Override
+    public int compareTo(FeedActivity o) {
+        return o.getActivityId() - this.activityId;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedCollectRuntimeInputHandler.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
new file mode 100644
index 0000000..161408a
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedCollectRuntimeInputHandler.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedCollectRuntimeInputHandler extends FeedRuntimeInputHandler {
+
+    private final FeedFrameCache feedFrameCache;
+
+    public FeedCollectRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled,
+            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions)
+            throws IOException {
+        super(ctx, connectionId, runtimeId, coreOperator, fpa, bufferingEnabled, fta, recordDesc, feedManager,
+                nPartitions);
+        this.feedFrameCache = new FeedFrameCache(ctx, fta, coreOperator);
+    }
+
+    public void process(ByteBuffer frame) throws HyracksDataException {
+        feedFrameCache.sendMessage(frame);
+        super.process(frame);
+    }
+
+    public void replayFrom(int recordId) throws HyracksDataException {
+        feedFrameCache.replayRecords(recordId);
+    }
+
+    public void dropTill(int recordId) {
+        feedFrameCache.dropTillRecordId(recordId);
+    }
+    
+    public void replayCached() throws HyracksDataException{
+        feedFrameCache.replayAll();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectJobInfo.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectJobInfo.java
new file mode 100644
index 0000000..1045e6e
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectJobInfo.java
@@ -0,0 +1,75 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedConnectJobInfo extends FeedJobInfo {
+
+    private final FeedConnectionId connectionId;
+    private final Map<String, String> feedPolicy;
+    private final IFeedJoint sourceFeedJoint;
+    private IFeedJoint computeFeedJoint;
+
+    private List<String> collectLocations;
+    private List<String> computeLocations;
+    private List<String> storageLocations;
+
+    public FeedConnectJobInfo(JobId jobId, FeedJobState state, FeedConnectionId connectionId,
+            IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
+            Map<String, String> feedPolicy) {
+        super(jobId, state, FeedJobInfo.JobType.FEED_CONNECT, spec);
+        this.connectionId = connectionId;
+        this.sourceFeedJoint = sourceFeedJoint;
+        this.computeFeedJoint = computeFeedJoint;
+        this.feedPolicy = feedPolicy;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public List<String> getCollectLocations() {
+        return collectLocations;
+    }
+
+    public List<String> getComputeLocations() {
+        return computeLocations;
+    }
+
+    public List<String> getStorageLocations() {
+        return storageLocations;
+    }
+
+    public void setCollectLocations(List<String> collectLocations) {
+        this.collectLocations = collectLocations;
+    }
+
+    public void setComputeLocations(List<String> computeLocations) {
+        this.computeLocations = computeLocations;
+    }
+
+    public void setStorageLocations(List<String> storageLocations) {
+        this.storageLocations = storageLocations;
+    }
+
+    public IFeedJoint getSourceFeedJoint() {
+        return sourceFeedJoint;
+    }
+
+    public IFeedJoint getComputeFeedJoint() {
+        return computeFeedJoint;
+    }
+
+    public Map<String, String> getFeedPolicy() {
+        return feedPolicy;
+    }
+
+    public void setComputeFeedJoint(IFeedJoint computeFeedJoint) {
+        this.computeFeedJoint = computeFeedJoint;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionId.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionId.java
index def7c10..e09a4fe 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionId.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionId.java
@@ -17,28 +17,27 @@ package edu.uci.ics.asterix.common.feeds;
 import java.io.Serializable;
 
 /**
- * A unique identifier for a data feed flowing into a dataset.
+ * A unique identifier for a feed connection. A feed connection is an instance of a data feed that is flowing into a dataset.
  */
 public class FeedConnectionId implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private final String dataverse;
-    private final String feedName;
+    private final FeedId feedId;
     private final String datasetName;
 
-    public FeedConnectionId(String dataverse, String feedName, String datasetName) {
-        this.dataverse = dataverse;
-        this.feedName = feedName;
+    public FeedConnectionId(FeedId feedId, String datasetName) {
+        this.feedId = feedId;
         this.datasetName = datasetName;
     }
 
-    public String getDataverse() {
-        return dataverse;
+    public FeedConnectionId(String dataverse, String feedName, String datasetName) {
+        this.feedId = new FeedId(dataverse, feedName);
+        this.datasetName = datasetName;
     }
 
-    public String getFeedName() {
-        return feedName;
+    public FeedId getFeedId() {
+        return feedId;
     }
 
     public String getDatasetName() {
@@ -50,9 +49,10 @@ public class FeedConnectionId implements Serializable {
         if (o == null || !(o instanceof FeedConnectionId)) {
             return false;
         }
-        if (((FeedConnectionId) o).getFeedName().equals(feedName)
-                && ((FeedConnectionId) o).getDataverse().equals(dataverse)
-                && ((FeedConnectionId) o).getDatasetName().equals(datasetName)) {
+
+        if (this == o
+                || (((FeedConnectionId) o).getFeedId().equals(feedId) && ((FeedConnectionId) o).getDatasetName()
+                        .equals(datasetName))) {
             return true;
         }
         return false;
@@ -65,6 +65,6 @@ public class FeedConnectionId implements Serializable {
 
     @Override
     public String toString() {
-        return dataverse + "." + feedName + "-->" + datasetName;
+        return feedId.toString() + "-->" + datasetName;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionRequest.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionRequest.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionRequest.java
new file mode 100644
index 0000000..2343b40
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConnectionRequest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
+
+/**
+ * A request for connecting a feed to a dataset.
+ */
+public class FeedConnectionRequest {
+
+    public enum ConnectionStatus {
+        /** initial state upon creating a connection request **/
+        INITIALIZED,
+
+        /** connection establish; feed is receiving data **/
+        ACTIVE,
+
+        /** connection removed; feed is not receiving data **/
+        INACTIVE,
+
+        /** connection request failed **/
+        FAILED
+    }
+
+    /** Feed joint on the feed pipeline that serves as the source for this subscription **/
+    private final FeedJointKey feedJointKey;
+
+    /** Location in the source feed pipeline from where feed tuples are received. **/
+    private final ConnectionLocation connectionLocation;
+
+    /** List of functions that need to be applied in sequence after the data hand-off at the source feedPointKey. **/
+    private final List<String> functionsToApply;
+
+    /** Status associated with the subscription. */
+    private ConnectionStatus connectionStatus;
+
+    /** Name of the policy that governs feed ingestion **/
+    private final String policy;
+
+    /** Policy associated with a feed connection **/
+    private final Map<String, String> policyParameters;
+
+    /** Target dataset associated with the connection request **/
+    private final String targetDataset;
+
+    private final FeedId receivingFeedId;
+
+    
+    public FeedConnectionRequest(FeedJointKey feedPointKey, ConnectionLocation connectionLocation,
+            List<String> functionsToApply, String targetDataset, String policy, Map<String, String> policyParameters,
+            FeedId receivingFeedId) {
+        this.feedJointKey = feedPointKey;
+        this.connectionLocation = connectionLocation;
+        this.functionsToApply = functionsToApply;
+        this.targetDataset = targetDataset;
+        this.policy = policy;
+        this.policyParameters = policyParameters;
+        this.receivingFeedId = receivingFeedId;
+        this.connectionStatus = ConnectionStatus.INITIALIZED;
+    }
+
+    public FeedJointKey getFeedJointKey() {
+        return feedJointKey;
+    }
+
+    public ConnectionStatus getConnectionStatus() {
+        return connectionStatus;
+    }
+
+    public void setSubscriptionStatus(ConnectionStatus connectionStatus) {
+        this.connectionStatus = connectionStatus;
+    }
+
+    public String getPolicy() {
+        return policy;
+    }
+
+    public String getTargetDataset() {
+        return targetDataset;
+    }
+
+    public ConnectionLocation getSubscriptionLocation() {
+        return connectionLocation;
+    }
+
+    public FeedId getReceivingFeedId() {
+        return receivingFeedId;
+    }
+
+    public Map<String, String> getPolicyParameters() {
+        return policyParameters;
+    }
+
+    public List<String> getFunctionsToApply() {
+        return functionsToApply;
+    }
+
+    @Override
+    public String toString() {
+        return "Feed Connection Request " + feedJointKey + " [" + connectionLocation + "]" + " Apply ("
+                + StringUtils.join(functionsToApply, ",") + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConstants.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConstants.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConstants.java
new file mode 100644
index 0000000..ae29908
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedConstants.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.asterix.common.feeds;
+
+public class FeedConstants {
+
+    public static final class StatisticsConstants {
+        public static final String INTAKE_TUPLEID = "intake-tupleid";
+        public static final String INTAKE_PARTITION = "intake-partition";
+        public static final String INTAKE_TIMESTAMP = "intake-timestamp";
+        public static final String COMPUTE_TIMESTAMP = "compute-timestamp";
+        public static final String STORE_TIMESTAMP = "store-timestamp";
+
+    }
+
+    public static final class MessageConstants {
+        public static final String MESSAGE_TYPE = "message-type";
+        public static final String NODE_ID = "nodeId";
+        public static final String DATAVERSE = "dataverse";
+        public static final String FEED = "feed";
+        public static final String DATASET = "dataset";
+        public static final String AQL = "aql";
+        public static final String RUNTIME_TYPE = "runtime-type";
+        public static final String PARTITION = "partition";
+        public static final String INTAKE_PARTITION = "intake-partition";
+        public static final String INFLOW_RATE = "inflow-rate";
+        public static final String OUTFLOW_RATE = "outflow-rate";
+        public static final String MODE = "mode";
+        public static final String CURRENT_CARDINALITY = "current-cardinality";
+        public static final String REDUCED_CARDINALITY = "reduced-cardinality";
+        public static final String VALUE_TYPE = "value-type";
+        public static final String VALUE = "value";
+        public static final String CPU_LOAD = "cpu-load";
+        public static final String N_RUNTIMES = "n_runtimes";
+        public static final String HEAP_USAGE = "heap_usage";
+        public static final String OPERAND_ID = "operand-id";
+        public static final String COMPUTE_PARTITION_RETAIN_LIMIT = "compute-partition-retain-limit";
+        public static final String LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP = "last-persisted-tuple-intake_timestamp";
+        public static final String PERSISTENCE_DELAY_WITHIN_LIMIT = "persistence-delay-within-limit";
+        public static final String AVERAGE_PERSISTENCE_DELAY = "average-persistence-delay";
+        public static final String COMMIT_ACKS = "commit-acks";
+        public static final String MAX_WINDOW_ACKED = "max-window-acked";
+        public static final String BASE = "base";
+        public static final String NOT_APPLICABLE = "N/A";
+        
+    }
+
+    public static final class NamingConstants {
+        public static final String LIBRARY_NAME_SEPARATOR = "#";
+    }
+
+    public static class JobConstants {
+        public static final int DEFAULT_FRAME_SIZE = 8192;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedExceptionHandler.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedExceptionHandler.java
new file mode 100644
index 0000000..cd9dca4
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedExceptionHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.exceptions.FrameDataException;
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class FeedExceptionHandler implements IExceptionHandler {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedExceptionHandler.class.getName());
+
+    private final IHyracksTaskContext ctx;
+    private final FrameTupleAccessor fta;
+    private final RecordDescriptor recordDesc;
+    private final IFeedManager feedManager;
+    private final FeedConnectionId connectionId;
+
+    public FeedExceptionHandler(IHyracksTaskContext ctx, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+            IFeedManager feedManager, FeedConnectionId connectionId) {
+        this.ctx = ctx;
+        this.fta = fta;
+        this.recordDesc = recordDesc;
+        this.feedManager = feedManager;
+        this.connectionId = connectionId;
+    }
+
+    public ByteBuffer handleException(Exception e, ByteBuffer frame) {
+        try {
+            if (e instanceof FrameDataException) {
+                fta.reset(frame);
+                FrameDataException fde = (FrameDataException) e;
+                int tupleIndex = fde.getTupleIndex();
+
+                // logging 
+                try {
+                    logExceptionCausingTuple(tupleIndex, e);
+                } catch (Exception ex) {
+                    ex.addSuppressed(e);
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Unable to log exception causing tuple due to..." + ex.getMessage());
+                    }
+                }
+                // slicing
+                return FeedFrameUtil.getSlicedFrame(ctx, tupleIndex, fta);
+            } else {
+                return null;
+            }
+        } catch (Exception exception) {
+            exception.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to handle exception " + exception.getMessage());
+            }
+            return null;
+        }
+    }
+
+    private void logExceptionCausingTuple(int tupleIndex, Exception e) throws HyracksDataException, AsterixException {
+
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream di = new DataInputStream(bbis);
+
+        int start = fta.getTupleStartOffset(tupleIndex) + fta.getFieldSlotsLength();
+        bbis.setByteBuffer(fta.getBuffer(), start);
+
+        Object[] record = new Object[recordDesc.getFieldCount()];
+
+        for (int i = 0; i < record.length; ++i) {
+            Object instance = recordDesc.getFields()[i].deserialize(di);
+            if (i == 0) {
+                String tuple = String.valueOf(instance);
+                feedManager.getFeedMetadataManager().logTuple(connectionId, tuple, e.getMessage(), feedManager);
+            } else {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning(", " + String.valueOf(instance));
+                }
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCache.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCache.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCache.java
new file mode 100644
index 0000000..310e7c0
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCache.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import edu.uci.ics.asterix.common.feeds.FeedConstants.StatisticsConstants;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+/**
+ * Allows caching of feed frames. This class is used in providing upstream backup.
+ * The tuples at the intake layer are held in this cache until these are acked by
+ * the storage layer post their persistence. On receiving an ack, appropriate tuples
+ * (recordsId < ackedRecordId) are dropped from the cache.
+ */
+public class FeedFrameCache extends MessageReceiver<ByteBuffer> {
+
+    /**
+     * Value represents a cache feed frame
+     * Key represents the largest record Id in the frame.
+     * At the intake side, the largest record id corresponds to the last record in the frame
+     **/
+    private final Map<Integer, ByteBuffer> orderedCache;
+    private final FrameTupleAccessor tupleAccessor;
+    private final IFrameWriter frameWriter;
+    private final IHyracksTaskContext ctx;
+
+    public FeedFrameCache(IHyracksTaskContext ctx, FrameTupleAccessor tupleAccessor, IFrameWriter frameWriter) {
+        this.tupleAccessor = tupleAccessor;
+        this.frameWriter = frameWriter;
+        /** A LinkedHashMap ensures entries are retrieved in order of their insertion **/
+        this.orderedCache = new LinkedHashMap<Integer, ByteBuffer>();
+        this.ctx = ctx;
+    }
+
+    @Override
+    public void processMessage(ByteBuffer frame) throws Exception {
+        int lastRecordId = getLastRecordId(frame);
+        ByteBuffer clone = cloneFrame(frame);
+        orderedCache.put(lastRecordId, clone);
+    }
+
+    public void dropTillRecordId(int recordId) {
+        List<Integer> dropRecordIds = new ArrayList<Integer>();
+        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
+            int recId = entry.getKey();
+            if (recId <= recordId) {
+                dropRecordIds.add(recId);
+            } else {
+                break;
+            }
+        }
+        for (Integer r : dropRecordIds) {
+            orderedCache.remove(r);
+        }
+    }
+
+    public void replayRecords(int startingRecordId) throws HyracksDataException {
+        boolean replayPositionReached = false;
+        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
+            // the key increases monotonically
+            int maxRecordIdInFrame = entry.getKey();
+            if (!replayPositionReached) {
+                if (startingRecordId < maxRecordIdInFrame) {
+                    replayFrame(startingRecordId, entry.getValue());
+                    break;
+                } else {
+                    continue;
+                }
+            }
+        }
+    }
+
+    /**
+     * Replay the frame from the tuple (inclusive) with recordId as specified.
+     * 
+     * @param recordId
+     * @param frame
+     * @throws HyracksDataException
+     */
+    private void replayFrame(int recordId, ByteBuffer frame) throws HyracksDataException {
+        tupleAccessor.reset(frame);
+        int nTuples = tupleAccessor.getTupleCount();
+        for (int i = 0; i < nTuples; i++) {
+            int rid = getRecordIdAtTupleIndex(i, frame);
+            if (rid == recordId) {
+                ByteBuffer slicedFrame = splitFrame(i, frame);
+                replayFrame(slicedFrame);
+                break;
+            }
+        }
+    }
+
+    private ByteBuffer splitFrame(int beginTupleIndex, ByteBuffer frame) throws HyracksDataException {
+        IFrame slicedFrame = new VSizeFrame(ctx);
+        FrameTupleAppender appender = new FrameTupleAppender();
+        appender.reset(slicedFrame, true);
+        int totalTuples = tupleAccessor.getTupleCount();
+        for (int ti = beginTupleIndex; ti < totalTuples; ti++) {
+            appender.append(tupleAccessor, ti);
+        }
+        return slicedFrame.getBuffer();
+    }
+
+    /**
+     * Replay the frame
+     * 
+     * @param frame
+     * @throws HyracksDataException
+     */
+    private void replayFrame(ByteBuffer frame) throws HyracksDataException {
+        frameWriter.nextFrame(frame);
+    }
+
+    private int getLastRecordId(ByteBuffer frame) {
+        tupleAccessor.reset(frame);
+        int nTuples = tupleAccessor.getTupleCount();
+        return getRecordIdAtTupleIndex(nTuples - 1, frame);
+    }
+
+    private int getRecordIdAtTupleIndex(int tupleIndex, ByteBuffer frame) {
+        tupleAccessor.reset(frame);
+        int recordStart = tupleAccessor.getTupleStartOffset(tupleIndex) + tupleAccessor.getFieldSlotsLength();
+        int openPartOffset = frame.getInt(recordStart + 6);
+        int numOpenFields = frame.getInt(recordStart + openPartOffset);
+        int recordIdOffset = frame.getInt(recordStart + openPartOffset + 4 + numOpenFields * 8
+                + StatisticsConstants.INTAKE_TUPLEID.length() + 2 + 1);
+        int lastRecordId = frame.getInt(recordStart + recordIdOffset);
+        return lastRecordId;
+    }
+
+    private ByteBuffer cloneFrame(ByteBuffer frame) {
+        ByteBuffer clone = ByteBuffer.allocate(frame.capacity());
+        System.arraycopy(frame.array(), 0, clone.array(), 0, frame.limit());
+        return clone;
+    }
+
+    public void replayAll() throws HyracksDataException {
+        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
+            ByteBuffer frame = entry.getValue();
+            frameWriter.nextFrame(frame);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCollector.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCollector.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCollector.java
new file mode 100644
index 0000000..e8e424c
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameCollector.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * Copyright 2009-2013 by The Regents of the University of California
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.IMessageReceiver;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedFrameCollector extends MessageReceiver<DataBucket> implements IMessageReceiver<DataBucket> {
+
+    private final FeedConnectionId connectionId;
+    private final FrameDistributor frameDistributor;
+    private FeedPolicyAccessor fpa;
+    private IFrameWriter frameWriter;
+    private State state;
+
+    public enum State {
+        ACTIVE,
+        FINISHED,
+        TRANSITION,
+        HANDOVER
+    }
+
+    public FeedFrameCollector(FrameDistributor frameDistributor, FeedPolicyAccessor feedPolicyAccessor,
+            IFrameWriter frameWriter, FeedConnectionId connectionId) {
+        super();
+        this.frameDistributor = frameDistributor;
+        this.fpa = feedPolicyAccessor;
+        this.connectionId = connectionId;
+        this.frameWriter = frameWriter;
+        this.state = State.ACTIVE;
+    }
+
+    @Override
+    public void processMessage(DataBucket bucket) throws Exception {
+        try {
+            ByteBuffer frame = bucket.getContent();
+            switch (bucket.getContentType()) {
+                case DATA:
+                    frameWriter.nextFrame(frame);
+                    break;
+                case EOD:
+                    closeCollector();
+                    break;
+                case EOSD:
+                    throw new AsterixException("Received data bucket with content of type " + bucket.getContentType());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to process data bucket " + bucket + ", encountered exception " + e.getMessage());
+            }
+        } finally {
+            bucket.doneReading();
+        }
+    }
+
+    public void closeCollector() {
+        if (state.equals(State.TRANSITION)) {
+            super.close(true);
+            setState(State.ACTIVE);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(this + " is now " + State.ACTIVE + " mode, processing frames synchronously");
+            }
+        } else {
+            flushPendingMessages();
+            setState(State.FINISHED);
+            synchronized (frameDistributor.getRegisteredCollectors()) {
+                frameDistributor.getRegisteredCollectors().notifyAll();
+            }
+            disconnect();
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Closed collector " + this);
+        }
+    }
+
+    public synchronized void disconnect() {
+        setState(State.FINISHED);
+    }
+
+    public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+        frameWriter.nextFrame(frame);
+    }
+
+    public FeedPolicyAccessor getFeedPolicyAccessor() {
+        return fpa;
+    }
+
+    public synchronized State getState() {
+        return state;
+    }
+
+    public synchronized void setState(State state) {
+        this.state = state;
+        switch (state) {
+            case FINISHED:
+            case HANDOVER:
+                notifyAll();
+                break;
+            default:
+                break;
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Frame Collector " + this.frameDistributor.getFeedRuntimeType() + " switched to " + state);
+        }
+    }
+
+    public IFrameWriter getFrameWriter() {
+        return frameWriter;
+    }
+
+    public void setFrameWriter(IFrameWriter frameWriter) {
+        this.frameWriter = frameWriter;
+    }
+
+    @Override
+    public String toString() {
+        return "FrameCollector " + connectionId + "," + state + "]";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o instanceof FeedFrameCollector) {
+            return connectionId.equals(((FeedFrameCollector) o).connectionId);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return connectionId.toString().hashCode();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameDiscarder.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameDiscarder.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameDiscarder.java
new file mode 100644
index 0000000..437ed95
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameDiscarder.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class FeedFrameDiscarder {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
+
+    private final IHyracksTaskContext ctx;
+    private final FeedRuntimeInputHandler inputHandler;
+    private final FeedConnectionId connectionId;
+    private final FeedRuntimeId runtimeId;
+    private final FeedPolicyAccessor policyAccessor;
+    private final float maxFractionDiscard;
+    private int nDiscarded;
+
+    public FeedFrameDiscarder(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId, 
+            FeedPolicyAccessor policyAccessor, FeedRuntimeInputHandler inputHandler) throws IOException {
+        this.ctx = ctx;
+        this.connectionId = connectionId;
+        this.runtimeId = runtimeId;
+        this.policyAccessor = policyAccessor;
+        this.inputHandler = inputHandler;
+        this.maxFractionDiscard = policyAccessor.getMaxFractionDiscard();
+    }
+
+    public boolean processMessage(ByteBuffer message) {
+        if (policyAccessor.getMaxFractionDiscard() != 0) {
+            long nProcessed = inputHandler.getProcessed();
+            long discardLimit = (long) (nProcessed * maxFractionDiscard);
+            if (nDiscarded >= discardLimit) {
+                return false;
+            }
+            nDiscarded++;
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Discarded frame by " + connectionId + " (" + runtimeId + ")" + " count so far  ("
+                        + nDiscarded + ") Limit [" + discardLimit + "]");
+            }
+            return true;
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameHandlers.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameHandlers.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameHandlers.java
new file mode 100644
index 0000000..5c41fd4
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameHandlers.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedFrameHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedFrameHandlers {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedFrameHandlers.class.getName());
+
+    public enum RoutingMode {
+        IN_MEMORY_ROUTE,
+        SPILL_TO_DISK,
+        DISCARD
+    }
+
+    public static IFeedFrameHandler getFeedFrameHandler(FrameDistributor distributor, FeedId feedId,
+            RoutingMode routingMode, FeedRuntimeType runtimeType, int partition, int frameSize) throws IOException {
+        IFeedFrameHandler handler = null;
+        switch (routingMode) {
+            case IN_MEMORY_ROUTE:
+                handler = new InMemoryRouter(distributor.getRegisteredReaders().values(), runtimeType, partition);
+                break;
+            case SPILL_TO_DISK:
+                handler = new DiskSpiller(distributor, feedId, runtimeType, partition, frameSize);
+                break;
+            case DISCARD:
+                handler = new DiscardRouter(distributor, feedId, runtimeType, partition);
+                break;
+            default:
+                throw new IllegalArgumentException("Invalid routing mode" + routingMode);
+        }
+        return handler;
+    }
+
+    public static class DiscardRouter implements IFeedFrameHandler {
+
+        private final FeedId feedId;
+        private int nDiscarded;
+        private final FeedRuntimeType runtimeType;
+        private final int partition;
+        private final FrameDistributor distributor;
+
+        public DiscardRouter(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition)
+                throws HyracksDataException {
+            this.distributor = distributor;
+            this.feedId = feedId;
+            this.nDiscarded = 0;
+            this.runtimeType = runtimeType;
+            this.partition = partition;
+        }
+
+        @Override
+        public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+            FrameTupleAccessor fta = distributor.getFta();
+            fta.reset(frame);
+            int nTuples = fta.getTupleCount();
+            nDiscarded += nTuples;
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Discarded additional [" + runtimeType + "]" + "(" + partition + ")" + "  " + nTuples);
+            }
+        }
+
+        @Override
+        public void handleDataBucket(DataBucket bucket) {
+            nDiscarded++;
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Discard Count" + nDiscarded);
+            }
+        }
+
+        @Override
+        public void close() {
+            // do nothing, no resource to relinquish
+        }
+
+        @Override
+        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+            throw new IllegalStateException("Invalid operation");
+        }
+
+        @Override
+        public String toString() {
+            return "DiscardRouter" + "[" + feedId + "]" + "(" + nDiscarded + ")";
+        }
+
+        @Override
+        public String getSummary() {
+            return new String("Number of discarded frames (since last reset)" + " feedId " + "[" + feedId + "]" + "("
+                    + nDiscarded + ")");
+        }
+
+    }
+
+    public static class InMemoryRouter implements IFeedFrameHandler {
+
+        private final Collection<FeedFrameCollector> frameCollectors;
+
+        public InMemoryRouter(Collection<FeedFrameCollector> frameCollectors, FeedRuntimeType runtimeType, int partition) {
+            this.frameCollectors = frameCollectors;
+        }
+
+        @Override
+        public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+            throw new IllegalStateException("Operation not supported");
+        }
+
+        @Override
+        public void handleDataBucket(DataBucket bucket) {
+            for (FeedFrameCollector collector : frameCollectors) {
+                collector.sendMessage(bucket); // asynchronous call
+            }
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+
+        @Override
+        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+            throw new IllegalStateException("Operation not supported");
+        }
+
+        @Override
+        public String getSummary() {
+            return "InMemoryRouter Summary";
+        }
+    }
+
+    public static class DiskSpiller implements IFeedFrameHandler {
+
+        private final FeedId feedId;
+        private FrameSpiller<ByteBuffer> receiver;
+        private Iterator<ByteBuffer> iterator;
+
+        public DiskSpiller(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition,
+                int frameSize) throws IOException {
+            this.feedId = feedId;
+            receiver = new FrameSpiller<ByteBuffer>(distributor, feedId, frameSize);
+        }
+
+        @Override
+        public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+            receiver.sendMessage(frame);
+        }
+
+        private static class FrameSpiller<T> extends MessageReceiver<ByteBuffer> {
+
+            private final int frameSize;
+            private final FeedId feedId;
+            private BufferedOutputStream bos;
+            private final ByteBuffer reusableLengthBuffer;
+            private final ByteBuffer reusableDataBuffer;
+            private long offset;
+            private File file;
+            private final FrameDistributor frameDistributor;
+            private boolean fileCreated = false;
+
+            public FrameSpiller(FrameDistributor distributor, FeedId feedId, int frameSize) throws IOException {
+                this.feedId = feedId;
+                this.frameSize = frameSize;
+                this.frameDistributor = distributor;
+                reusableLengthBuffer = ByteBuffer.allocate(4);
+                reusableDataBuffer = ByteBuffer.allocate(frameSize);
+                this.offset = 0;
+            }
+
+            @Override
+            public void processMessage(ByteBuffer message) throws Exception {
+                if (!fileCreated) {
+                    createFile();
+                    fileCreated = true;
+                }
+                reusableLengthBuffer.flip();
+                reusableLengthBuffer.putInt(message.array().length);
+                bos.write(reusableLengthBuffer.array());
+                bos.write(message.array());
+            }
+
+            private void createFile() throws IOException {
+                Date date = new Date();
+                String dateSuffix = date.toString().replace(' ', '_');
+                String fileName = feedId.toString() + "_" + frameDistributor.getFeedRuntimeType() + "_"
+                        + frameDistributor.getPartition() + "_" + dateSuffix;
+
+                file = new File(fileName);
+                if (!file.exists()) {
+                    boolean success = file.createNewFile();
+                    if (!success) {
+                        throw new IOException("Unable to create spill file for feed " + feedId);
+                    }
+                }
+                bos = new BufferedOutputStream(new FileOutputStream(file));
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Created Spill File for feed " + feedId);
+                }
+            }
+
+            @SuppressWarnings("resource")
+            public Iterator<ByteBuffer> replayData() throws Exception {
+                final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
+                bis.skip(offset);
+                return new Iterator<ByteBuffer>() {
+
+                    @Override
+                    public boolean hasNext() {
+                        boolean more = false;
+                        try {
+                            more = bis.available() > 0;
+                            if (!more) {
+                                bis.close();
+                            }
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+
+                        return more;
+                    }
+
+                    @Override
+                    public ByteBuffer next() {
+                        reusableLengthBuffer.flip();
+                        try {
+                            bis.read(reusableLengthBuffer.array());
+                            reusableLengthBuffer.flip();
+                            int frameSize = reusableLengthBuffer.getInt();
+                            reusableDataBuffer.flip();
+                            bis.read(reusableDataBuffer.array(), 0, frameSize);
+                            offset += 4 + frameSize;
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        return reusableDataBuffer;
+                    }
+
+                    @Override
+                    public void remove() {
+                    }
+
+                };
+            }
+
+        }
+
+        @Override
+        public void handleDataBucket(DataBucket bucket) {
+            throw new IllegalStateException("Operation not supported");
+        }
+
+        @Override
+        public void close() {
+            receiver.close(true);
+        }
+
+        @Override
+        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+            try {
+                iterator = receiver.replayData();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+            return iterator;
+        }
+
+        //TODO: Form a summary that includes stats related to what has been spilled to disk
+        @Override
+        public String getSummary() {
+            return "Disk Spiller Summary";
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameSpiller.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameSpiller.java
new file mode 100644
index 0000000..db8145f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameSpiller.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class FeedFrameSpiller {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
+
+    private final IHyracksTaskContext ctx;
+    private final FeedConnectionId connectionId;
+    private final FeedRuntimeId runtimeId;
+    private final FeedPolicyAccessor policyAccessor;
+    private BufferedOutputStream bos;
+    private File file;
+    private boolean fileCreated = false;
+    private long bytesWritten = 0;
+    private int spilledFrameCount = 0;
+
+    public FeedFrameSpiller(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            FeedPolicyAccessor policyAccessor) throws IOException {
+        this.ctx = ctx;
+        this.connectionId = connectionId;
+        this.runtimeId = runtimeId;
+        this.policyAccessor = policyAccessor;
+    }
+
+    public boolean processMessage(ByteBuffer message) throws Exception {
+        if (!fileCreated) {
+            createFile();
+            fileCreated = true;
+        }
+        long maxAllowed = policyAccessor.getMaxSpillOnDisk();
+        if (maxAllowed != FeedPolicyAccessor.NO_LIMIT && bytesWritten + message.array().length > maxAllowed) {
+            return false;
+        } else {
+            bos.write(message.array());
+            bytesWritten += message.array().length;
+            spilledFrameCount++;
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Spilled frame by " + runtimeId + " spill count " + spilledFrameCount);
+            }
+            return true;
+        }
+    }
+
+    private void createFile() throws IOException {
+        Date date = new Date();
+        String dateSuffix = date.toString().replace(' ', '_');
+        String fileName = connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
+                + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition() + "_" + dateSuffix;
+
+        file = new File(fileName);
+        if (!file.exists()) {
+            boolean success = file.createNewFile();
+            if (!success) {
+                throw new IOException("Unable to create spill file " + fileName + " for feed " + runtimeId);
+            } else {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Created spill file " + file.getAbsolutePath());
+                }
+            }
+        }
+        bos = new BufferedOutputStream(new FileOutputStream(file));
+
+    }
+
+    public Iterator<ByteBuffer> replayData() throws Exception {
+        bos.flush();
+        return new FrameIterator(ctx, file.getName());
+    }
+
+    private static class FrameIterator implements Iterator<ByteBuffer> {
+
+        private final BufferedInputStream bis;
+        private final IHyracksTaskContext ctx;
+        private int readFrameCount = 0;
+
+        public FrameIterator(IHyracksTaskContext ctx, String filename) throws FileNotFoundException {
+            bis = new BufferedInputStream(new FileInputStream(new File(filename)));
+            this.ctx = ctx;
+        }
+
+        @Override
+        public boolean hasNext() {
+            boolean more = false;
+            try {
+                more = bis.available() > 0;
+                if (!more) {
+                    bis.close();
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            return more;
+        }
+
+        @Override
+        public ByteBuffer next() {
+            IFrame frame  = null;
+            try {
+                frame  = new VSizeFrame(ctx);
+                Arrays.fill(frame.getBuffer().array(), (byte) 0);
+                bis.read(frame.getBuffer().array(), 0, frame.getFrameSize());
+                readFrameCount++;
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Read spill frome " + readFrameCount);
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            return frame.getBuffer();
+        }
+
+        @Override
+        public void remove() {
+        }
+
+    }
+
+    public void reset() {
+        bytesWritten = 0;
+        //  file.delete();
+        fileCreated = false;
+        bos = null;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Resetted the FrameSpiller!");
+        }
+    }
+
+    public void close() {
+        if (bos != null) {
+            try {
+                bos.flush();
+                bos.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+}
\ No newline at end of file