You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:09 UTC

[15/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
new file mode 100644
index 0000000..6ad00f1
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFeedFrameHandler;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.message.MessageReceiver;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedFrameHandlers {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedFrameHandlers.class.getName());
+
+    public enum RoutingMode {
+        IN_MEMORY_ROUTE,
+        SPILL_TO_DISK,
+        DISCARD
+    }
+
+    public static IFeedFrameHandler getFeedFrameHandler(FrameDistributor distributor, FeedId feedId,
+            RoutingMode routingMode, FeedRuntimeType runtimeType, int partition, int frameSize) throws IOException {
+        IFeedFrameHandler handler = null;
+        switch (routingMode) {
+            case IN_MEMORY_ROUTE:
+                handler = new InMemoryRouter(distributor.getRegisteredReaders().values(), runtimeType, partition);
+                break;
+            case SPILL_TO_DISK:
+                handler = new DiskSpiller(distributor, feedId, runtimeType, partition, frameSize);
+                break;
+            case DISCARD:
+                handler = new DiscardRouter(distributor, feedId, runtimeType, partition);
+                break;
+            default:
+                throw new IllegalArgumentException("Invalid routing mode" + routingMode);
+        }
+        return handler;
+    }
+
+    public static class DiscardRouter implements IFeedFrameHandler {
+
+        private final FeedId feedId;
+        private int nDiscarded;
+        private final FeedRuntimeType runtimeType;
+        private final int partition;
+        private final FrameDistributor distributor;
+
+        public DiscardRouter(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition)
+                throws HyracksDataException {
+            this.distributor = distributor;
+            this.feedId = feedId;
+            this.nDiscarded = 0;
+            this.runtimeType = runtimeType;
+            this.partition = partition;
+        }
+
+        @Override
+        public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+            FrameTupleAccessor fta = distributor.getFta();
+            fta.reset(frame);
+            int nTuples = fta.getTupleCount();
+            nDiscarded += nTuples;
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Discarded additional [" + runtimeType + "]" + "(" + partition + ")" + "  " + nTuples);
+            }
+        }
+
+        @Override
+        public void handleDataBucket(DataBucket bucket) {
+            nDiscarded++;
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Discard Count" + nDiscarded);
+            }
+        }
+
+        @Override
+        public void close() {
+            // do nothing, no resource to relinquish
+        }
+
+        @Override
+        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+            throw new IllegalStateException("Invalid operation");
+        }
+
+        @Override
+        public String toString() {
+            return "DiscardRouter" + "[" + feedId + "]" + "(" + nDiscarded + ")";
+        }
+
+        @Override
+        public String getSummary() {
+            return new String("Number of discarded frames (since last reset)" + " feedId " + "[" + feedId + "]" + "("
+                    + nDiscarded + ")");
+        }
+
+    }
+
+    public static class InMemoryRouter implements IFeedFrameHandler {
+
+        private final Collection<FeedFrameCollector> frameCollectors;
+
+        public InMemoryRouter(Collection<FeedFrameCollector> frameCollectors, FeedRuntimeType runtimeType,
+                int partition) {
+            this.frameCollectors = frameCollectors;
+        }
+
+        @Override
+        public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+            throw new IllegalStateException("Operation not supported");
+        }
+
+        @Override
+        public void handleDataBucket(DataBucket bucket) {
+            for (FeedFrameCollector collector : frameCollectors) {
+                collector.sendMessage(bucket); // asynchronous call
+            }
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+
+        @Override
+        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+            throw new IllegalStateException("Operation not supported");
+        }
+
+        @Override
+        public String getSummary() {
+            return "InMemoryRouter Summary";
+        }
+    }
+
+    public static class DiskSpiller implements IFeedFrameHandler {
+
+        private FrameSpiller<ByteBuffer> receiver;
+        private Iterator<ByteBuffer> iterator;
+
+        public DiskSpiller(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition,
+                int frameSize) throws IOException {
+            receiver = new FrameSpiller<ByteBuffer>(distributor, feedId, frameSize);
+        }
+
+        @Override
+        public void handleFrame(ByteBuffer frame) throws HyracksDataException {
+            receiver.sendMessage(frame);
+        }
+
+        private static class FrameSpiller<T> extends MessageReceiver<ByteBuffer> {
+
+            private final FeedId feedId;
+            private BufferedOutputStream bos;
+            private final ByteBuffer reusableLengthBuffer;
+            private final ByteBuffer reusableDataBuffer;
+            private long offset;
+            private File file;
+            private final FrameDistributor frameDistributor;
+            private boolean fileCreated = false;
+
+            public FrameSpiller(FrameDistributor distributor, FeedId feedId, int frameSize) throws IOException {
+                this.feedId = feedId;
+                this.frameDistributor = distributor;
+                reusableLengthBuffer = ByteBuffer.allocate(4);
+                reusableDataBuffer = ByteBuffer.allocate(frameSize);
+                this.offset = 0;
+            }
+
+            @Override
+            public void processMessage(ByteBuffer message) throws Exception {
+                if (!fileCreated) {
+                    createFile();
+                    fileCreated = true;
+                }
+                reusableLengthBuffer.flip();
+                reusableLengthBuffer.putInt(message.array().length);
+                bos.write(reusableLengthBuffer.array());
+                bos.write(message.array());
+            }
+
+            private void createFile() throws IOException {
+                Date date = new Date();
+                String dateSuffix = date.toString().replace(' ', '_');
+                String fileName = feedId.toString() + "_" + frameDistributor.getFeedRuntimeType() + "_"
+                        + frameDistributor.getPartition() + "_" + dateSuffix;
+
+                file = new File(fileName);
+                if (!file.exists()) {
+                    boolean success = file.createNewFile();
+                    if (!success) {
+                        throw new IOException("Unable to create spill file for feed " + feedId);
+                    }
+                }
+                bos = new BufferedOutputStream(new FileOutputStream(file));
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Created Spill File for feed " + feedId);
+                }
+            }
+
+            @SuppressWarnings("resource")
+            public Iterator<ByteBuffer> replayData() throws Exception {
+                final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
+                bis.skip(offset);
+                return new Iterator<ByteBuffer>() {
+
+                    @Override
+                    public boolean hasNext() {
+                        boolean more = false;
+                        try {
+                            more = bis.available() > 0;
+                            if (!more) {
+                                bis.close();
+                            }
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+
+                        return more;
+                    }
+
+                    @Override
+                    public ByteBuffer next() {
+                        reusableLengthBuffer.flip();
+                        try {
+                            bis.read(reusableLengthBuffer.array());
+                            reusableLengthBuffer.flip();
+                            int frameSize = reusableLengthBuffer.getInt();
+                            reusableDataBuffer.flip();
+                            bis.read(reusableDataBuffer.array(), 0, frameSize);
+                            offset += 4 + frameSize;
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        return reusableDataBuffer;
+                    }
+
+                    @Override
+                    public void remove() {
+                    }
+
+                };
+            }
+
+        }
+
+        @Override
+        public void handleDataBucket(DataBucket bucket) {
+            throw new IllegalStateException("Operation not supported");
+        }
+
+        @Override
+        public void close() {
+            receiver.close(true);
+        }
+
+        @Override
+        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
+            try {
+                iterator = receiver.replayData();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+            return iterator;
+        }
+
+        //TODO: Form a summary that includes stats related to what has been spilled to disk
+        @Override
+        public String getSummary() {
+            return "Disk Spiller Summary";
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java
new file mode 100644
index 0000000..c9a29ac
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameSpiller.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedFrameSpiller {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
+
+    private final IHyracksTaskContext ctx;
+    private final FeedConnectionId connectionId;
+    private final FeedRuntimeId runtimeId;
+    private final FeedPolicyAccessor policyAccessor;
+    private BufferedOutputStream bos;
+    private File file;
+    private boolean fileCreated = false;
+    private long bytesWritten = 0;
+    private int spilledFrameCount = 0;
+
+    public FeedFrameSpiller(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            FeedPolicyAccessor policyAccessor) throws HyracksDataException {
+        this.ctx = ctx;
+        this.connectionId = connectionId;
+        this.runtimeId = runtimeId;
+        this.policyAccessor = policyAccessor;
+    }
+
+    public boolean processMessage(ByteBuffer message) throws HyracksDataException {
+        if (!fileCreated) {
+            createFile();
+            fileCreated = true;
+        }
+        long maxAllowed = policyAccessor.getMaxSpillOnDisk();
+        if (maxAllowed != FeedPolicyAccessor.NO_LIMIT && bytesWritten + message.array().length > maxAllowed) {
+            return false;
+        } else {
+            try {
+                bos.write(message.array());
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+            bytesWritten += message.array().length;
+            spilledFrameCount++;
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Spilled frame by " + runtimeId + " spill count " + spilledFrameCount);
+            }
+            return true;
+        }
+    }
+
+    private void createFile() throws HyracksDataException {
+        try {
+            Date date = new Date();
+            String dateSuffix = date.toString().replace(' ', '_');
+            String fileName = connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
+                    + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition() + "_" + dateSuffix;
+
+            file = new File(fileName);
+            if (!file.exists()) {
+                boolean success = file.createNewFile();
+                if (!success) {
+                    throw new HyracksDataException(
+                            "Unable to create spill file " + fileName + " for feed " + runtimeId);
+                } else {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Created spill file " + file.getAbsolutePath());
+                    }
+                }
+            }
+            bos = new BufferedOutputStream(new FileOutputStream(file));
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
+        }
+    }
+
+    public Iterator<ByteBuffer> replayData() throws Exception {
+        bos.flush();
+        return new FrameIterator(ctx, file.getName());
+    }
+
+    private static class FrameIterator implements Iterator<ByteBuffer> {
+
+        private final BufferedInputStream bis;
+        private final IHyracksTaskContext ctx;
+        private int readFrameCount = 0;
+
+        public FrameIterator(IHyracksTaskContext ctx, String filename) throws FileNotFoundException {
+            bis = new BufferedInputStream(new FileInputStream(new File(filename)));
+            this.ctx = ctx;
+        }
+
+        @Override
+        public boolean hasNext() {
+            boolean more = false;
+            try {
+                more = bis.available() > 0;
+                if (!more) {
+                    bis.close();
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            return more;
+        }
+
+        @Override
+        public ByteBuffer next() {
+            IFrame frame = null;
+            try {
+                frame = new VSizeFrame(ctx);
+                Arrays.fill(frame.getBuffer().array(), (byte) 0);
+                bis.read(frame.getBuffer().array(), 0, frame.getFrameSize());
+                readFrameCount++;
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Read spill frome " + readFrameCount);
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            return frame.getBuffer();
+        }
+
+        @Override
+        public void remove() {
+        }
+
+    }
+
+    public void reset() {
+        bytesWritten = 0;
+        //  file.delete();
+        fileCreated = false;
+        bos = null;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Resetted the FrameSpiller!");
+        }
+    }
+
+    public void close() {
+        if (bos != null) {
+            try {
+                bos.flush();
+                bos.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java
new file mode 100644
index 0000000..f08243e
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleAccessor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedFrameTupleAccessor implements IFrameTupleAccessor {
+
+    private final FrameTupleAccessor frameAccessor;
+    private final int numOpenFields;
+
+    public FeedFrameTupleAccessor(FrameTupleAccessor frameAccessor) {
+        this.frameAccessor = frameAccessor;
+        int firstRecordStart = frameAccessor.getTupleStartOffset(0) + frameAccessor.getFieldSlotsLength();
+        int openPartOffsetOrig = frameAccessor.getBuffer().getInt(firstRecordStart + 6);
+        numOpenFields = frameAccessor.getBuffer().getInt(firstRecordStart + openPartOffsetOrig);
+    }
+
+    public int getFeedIntakePartition(int tupleIndex) {
+        ByteBuffer buffer = frameAccessor.getBuffer();
+        int recordStart = frameAccessor.getTupleStartOffset(tupleIndex) + frameAccessor.getFieldSlotsLength();
+        int openPartOffsetOrig = buffer.getInt(recordStart + 6);
+        int partitionOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
+                + StatisticsConstants.INTAKE_PARTITION.length() + 2 + 1;
+        return buffer.getInt(recordStart + partitionOffset);
+    }
+    
+    
+
+    @Override
+    public int getFieldCount() {
+        return frameAccessor.getFieldCount();
+    }
+
+    @Override
+    public int getFieldSlotsLength() {
+        return frameAccessor.getFieldSlotsLength();
+    }
+
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        return frameAccessor.getFieldEndOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        return frameAccessor.getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getFieldLength(int tupleIndex, int fIdx) {
+        return frameAccessor.getFieldLength(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        return frameAccessor.getTupleEndOffset(tupleIndex);
+    }
+
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        return frameAccessor.getTupleStartOffset(tupleIndex);
+    }
+
+    @Override
+    public int getTupleCount() {
+        return frameAccessor.getTupleCount();
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return frameAccessor.getBuffer();
+    }
+
+    @Override
+    public void reset(ByteBuffer buffer) {
+        frameAccessor.reset(buffer);
+    }
+
+    @Override
+    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+        return frameAccessor.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getTupleLength(int tupleIndex) {
+        return frameAccessor.getTupleLength(tupleIndex);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java
new file mode 100644
index 0000000..d43f90d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameTupleDecorator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class FeedFrameTupleDecorator {
+
+    private AMutableString aString = new AMutableString("");
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AtomicInteger tupleId;
+
+    @SuppressWarnings("unchecked")
+    private static ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ASTRING);
+    @SuppressWarnings("unchecked")
+    private static ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+    @SuppressWarnings("unchecked")
+    private static ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+
+    private final int partition;
+    private final ArrayBackedValueStorage attrNameStorage;
+    private final ArrayBackedValueStorage attrValueStorage;
+
+    public FeedFrameTupleDecorator(int partition) {
+        this.tupleId = new AtomicInteger(0);
+        this.partition = partition;
+        this.attrNameStorage = new ArrayBackedValueStorage();
+        this.attrValueStorage = new ArrayBackedValueStorage();
+    }
+
+    public void addLongAttribute(String attrName, long attrValue, IARecordBuilder recordBuilder)
+            throws HyracksDataException, AsterixException {
+        attrNameStorage.reset();
+        aString.setValue(attrName);
+        stringSerde.serialize(aString, attrNameStorage.getDataOutput());
+
+        attrValueStorage.reset();
+        aInt64.setValue(attrValue);
+        int64Serde.serialize(aInt64, attrValueStorage.getDataOutput());
+
+        recordBuilder.addField(attrNameStorage, attrValueStorage);
+    }
+
+    public void addIntegerAttribute(String attrName, int attrValue, IARecordBuilder recordBuilder)
+            throws HyracksDataException, AsterixException {
+        attrNameStorage.reset();
+        aString.setValue(attrName);
+        stringSerde.serialize(aString, attrNameStorage.getDataOutput());
+
+        attrValueStorage.reset();
+        aInt32.setValue(attrValue);
+        int32Serde.serialize(aInt32, attrValueStorage.getDataOutput());
+
+        recordBuilder.addField(attrNameStorage, attrValueStorage);
+    }
+
+    public void addTupleId(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+        addIntegerAttribute(StatisticsConstants.INTAKE_TUPLEID, tupleId.incrementAndGet(), recordBuilder);
+    }
+
+    public void addIntakePartition(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+        addIntegerAttribute(StatisticsConstants.INTAKE_PARTITION, partition, recordBuilder);
+    }
+
+    public void addIntakeTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+        addLongAttribute(StatisticsConstants.INTAKE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
+    }
+
+    public void addStoreTimestamp(IARecordBuilder recordBuilder) throws HyracksDataException, AsterixException {
+        addLongAttribute(StatisticsConstants.STORE_TIMESTAMP, System.currentTimeMillis(), recordBuilder);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
new file mode 100644
index 0000000..3a46b1a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IExceptionHandler;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
+import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.dataflow.DataBucket.ContentType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.message.FeedCongestionMessage;
+import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.feed.watch.MonitoredBuffer;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * Provides for error-handling and input-side buffering for a feed runtime.
+ * The input handler is buffering in:
+ * 1. FeedMetaComputeNodePushable.initializeNewFeedRuntime();
+ * 2. FeedMetaStoreNodePushable.initializeNewFeedRuntime();
+ *              ______
+ *             |      |
+ * ============|core  |============
+ * ============| op   |============
+ * ^^^^^^^^^^^^|______|
+ *  Input Side
+ *  Handler
+ *
+ **/
+public class FeedRuntimeInputHandler implements IFrameWriter {
+
+    private static Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
+
+    private final FeedConnectionId connectionId;
+    private final FeedRuntimeId runtimeId;
+    private final FeedPolicyAccessor feedPolicyAccessor;
+    private final IExceptionHandler exceptionHandler;
+    private final FeedFrameDiscarder discarder;
+    private final FeedFrameSpiller spiller;
+    private final FeedPolicyAccessor fpa;
+    private final IFeedManager feedManager;
+    private boolean bufferingEnabled;
+    private IFrameWriter coreOperator;
+    private MonitoredBuffer mBuffer;
+    private DataBucketPool pool;
+    private FrameCollection frameCollection;
+    private Mode mode;
+    private Mode lastMode;
+    private boolean finished;
+    private long nProcessed;
+    private boolean throttlingEnabled;
+
+    private FrameEventCallback frameEventCallback;
+
+    public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            IFrameWriter coreOperator, FeedPolicyAccessor fpa, FrameTupleAccessor fta, RecordDescriptor recordDesc,
+            IFeedManager feedManager, int nPartitions) throws HyracksDataException {
+        this(ctx, connectionId, runtimeId, coreOperator, fpa, fpa.bufferingEnabled(), fta, recordDesc, feedManager,
+                nPartitions);
+    }
+
+    public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled, FrameTupleAccessor fta,
+            RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions) throws HyracksDataException {
+        this.connectionId = connectionId;
+        this.runtimeId = runtimeId;
+        this.coreOperator = coreOperator;
+        this.bufferingEnabled = bufferingEnabled;
+        this.feedPolicyAccessor = fpa;
+        this.spiller = new FeedFrameSpiller(ctx, connectionId, runtimeId, fpa);
+        this.discarder = new FeedFrameDiscarder(connectionId, runtimeId, fpa, this);
+        this.exceptionHandler = new FeedExceptionHandler(ctx, fta, recordDesc, feedManager, connectionId);
+        this.mode = Mode.PROCESS;
+        this.lastMode = Mode.PROCESS;
+        this.finished = false;
+        this.fpa = fpa;
+        this.feedManager = feedManager;
+        this.pool = (DataBucketPool) feedManager.getFeedMemoryManager()
+                .getMemoryComponent(IFeedMemoryComponent.Type.POOL);
+        this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
+                .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
+        this.frameEventCallback = new FrameEventCallback(fpa, this, coreOperator);
+        this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
+                feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
+                nPartitions, fpa);
+        this.mBuffer.start();
+        this.throttlingEnabled = false;
+    }
+
+    @Override
+    public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+        try {
+            switch (mode) {
+                case PROCESS:
+                    switch (lastMode) {
+                        case SPILL:
+                        case POST_SPILL_DISCARD:
+                            setMode(Mode.PROCESS_SPILL);
+                            processSpilledBacklog();
+                            break;
+                        case STALL:
+                            setMode(Mode.PROCESS_BACKLOG);
+                            processBufferredBacklog();
+                            break;
+                        default:
+                            break;
+                    }
+                    process(frame);
+                    break;
+                case PROCESS_BACKLOG:
+                case PROCESS_SPILL:
+                    process(frame);
+                    break;
+                case SPILL:
+                    spill(frame);
+                    break;
+                case DISCARD:
+                case POST_SPILL_DISCARD:
+                    discard(frame);
+                    break;
+                case STALL:
+                    switch (runtimeId.getFeedRuntimeType()) {
+                        case COLLECT:
+                        case COMPUTE_COLLECT:
+                        case COMPUTE:
+                        case STORE:
+                            bufferDataUntilRecovery(frame);
+                            break;
+                        default:
+                            if (LOGGER.isLoggable(Level.WARNING)) {
+                                LOGGER.warning("Discarding frame during " + mode + " mode " + this.runtimeId);
+                            }
+                            break;
+                    }
+                    break;
+                case END:
+                case FAIL:
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Ignoring incoming tuples in " + mode + " mode");
+                    }
+                    break;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void bufferDataUntilRecovery(ByteBuffer frame) throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Bufferring data until recovery is complete " + this.runtimeId);
+        }
+        if (frameCollection == null) {
+            this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
+                    .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
+        }
+        if (frameCollection == null) {
+            discarder.processMessage(frame);
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Running low on memory! DISCARDING FRAME ");
+            }
+        } else {
+            boolean success = frameCollection.addFrame(frame);
+            if (!success) {
+                if (fpa.spillToDiskOnCongestion()) {
+                    if (frame != null) {
+                        spiller.processMessage(frame);
+                    } // TODO handle the else case
+                } else {
+                    discarder.processMessage(frame);
+                }
+            }
+        }
+    }
+
+    public void reportUnresolvableCongestion() throws HyracksDataException {
+        if (this.runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.COMPUTE)) {
+            FeedCongestionMessage congestionReport = new FeedCongestionMessage(connectionId, runtimeId,
+                    mBuffer.getInflowRate(), mBuffer.getOutflowRate(), mode);
+            feedManager.getFeedMessageService().sendMessage(congestionReport);
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Congestion reported " + this.connectionId + " " + this.runtimeId);
+            }
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unresolvable congestion at " + this.connectionId + " " + this.runtimeId);
+            }
+        }
+    }
+
+    private void processBufferredBacklog() throws HyracksDataException {
+        try {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Processing backlog " + this.runtimeId);
+            }
+
+            if (frameCollection != null) {
+                Iterator<ByteBuffer> backlog = frameCollection.getFrameCollectionIterator();
+                while (backlog.hasNext()) {
+                    process(backlog.next());
+                    nProcessed++;
+                }
+                DataBucket bucket = pool.getDataBucket();
+                bucket.setContentType(ContentType.EOSD);
+                bucket.setDesiredReadCount(1);
+                mBuffer.sendMessage(bucket);
+                feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
+                frameCollection = null;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void processSpilledBacklog() throws HyracksDataException {
+        try {
+            Iterator<ByteBuffer> backlog = spiller.replayData();
+            while (backlog.hasNext()) {
+                process(backlog.next());
+                nProcessed++;
+            }
+            DataBucket bucket = pool.getDataBucket();
+            bucket.setContentType(ContentType.EOSD);
+            bucket.setDesiredReadCount(1);
+            mBuffer.sendMessage(bucket);
+            spiller.reset();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    protected void process(ByteBuffer frame) throws HyracksDataException {
+        boolean frameProcessed = false;
+        while (!frameProcessed) {
+            try {
+                if (!bufferingEnabled) {
+                    if (frame == null) {
+                        setFinished(true);
+                        synchronized (coreOperator) {
+                            coreOperator.notifyAll();
+                        }
+                    } else {
+                        coreOperator.nextFrame(frame); // synchronous
+                        mBuffer.sendReport(frame);
+                    }
+                } else {
+                    DataBucket bucket = pool.getDataBucket();
+                    if (bucket != null) {
+                        if (frame != null) {
+                            bucket.reset(frame); // created a copy here
+                            bucket.setContentType(ContentType.DATA);
+                        } else {
+                            bucket.setContentType(ContentType.EOD);
+                        }
+                        bucket.setDesiredReadCount(1);
+                        mBuffer.sendMessage(bucket);
+                        mBuffer.sendReport(frame);
+                        nProcessed++;
+                    } else {
+                        if (fpa.spillToDiskOnCongestion()) {
+                            if (frame != null) {
+                                boolean spilled = spiller.processMessage(frame);
+                                if (spilled) {
+                                    setMode(Mode.SPILL);
+                                } else {
+                                    reportUnresolvableCongestion();
+                                }
+                            }
+                        } else if (fpa.discardOnCongestion()) {
+                            boolean discarded = discarder.processMessage(frame);
+                            if (!discarded) {
+                                reportUnresolvableCongestion();
+                            }
+                        } else if (fpa.throttlingEnabled()) {
+                            setThrottlingEnabled(true);
+                        } else {
+                            reportUnresolvableCongestion();
+                        }
+
+                    }
+                }
+                frameProcessed = true;
+            } catch (Exception e) {
+                e.printStackTrace();
+                if (feedPolicyAccessor.continueOnSoftFailure()) {
+                    frame = exceptionHandler.handleException(e, frame);
+                    if (frame == null) {
+                        frameProcessed = true;
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Encountered exception! " + e.getMessage()
+                                    + "Insufficient information, Cannot extract failing tuple");
+                        }
+                    }
+                } else {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Ingestion policy does not require recovering from tuple. Feed would terminate");
+                    }
+                    mBuffer.close(false);
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+    }
+
+    private void spill(ByteBuffer frame) throws Exception {
+        boolean success = spiller.processMessage(frame);
+        if (!success) {
+            // limit reached
+            setMode(Mode.POST_SPILL_DISCARD);
+            reportUnresolvableCongestion();
+        }
+    }
+
+    private void discard(ByteBuffer frame) throws Exception {
+        boolean success = discarder.processMessage(frame);
+        if (!success) { // limit reached
+            reportUnresolvableCongestion();
+        }
+    }
+
+    public Mode getMode() {
+        return mode;
+    }
+
+    public synchronized void setMode(Mode mode) {
+        if (mode.equals(this.mode)) {
+            return;
+        }
+        this.lastMode = this.mode;
+        this.mode = mode;
+        if (mode.equals(Mode.END)) {
+            this.close();
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Switched from " + lastMode + " to " + mode + " " + this.runtimeId);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (mBuffer != null) {
+            boolean disableMonitoring = !this.mode.equals(Mode.STALL);
+            if (frameCollection != null) {
+                feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
+            }
+            if (pool != null) {
+                feedManager.getFeedMemoryManager().releaseMemoryComponent(pool);
+            }
+            mBuffer.close(false, disableMonitoring);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Closed input side handler for " + this.runtimeId + " disabled monitoring "
+                        + disableMonitoring + " Mode for runtime " + this.mode);
+            }
+        }
+    }
+
+    public IFrameWriter getCoreOperator() {
+        return coreOperator;
+    }
+
+    public void setCoreOperator(IFrameWriter coreOperator) {
+        this.coreOperator = coreOperator;
+        mBuffer.setFrameWriter(coreOperator);
+        frameEventCallback.setCoreOperator(coreOperator);
+    }
+
+    public boolean isFinished() {
+        return finished;
+    }
+
+    public void setFinished(boolean finished) {
+        this.finished = finished;
+    }
+
+    public long getProcessed() {
+        return nProcessed;
+    }
+
+    public FeedRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        coreOperator.open();
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        coreOperator.fail();
+    }
+
+    public void reset(int nPartitions) {
+        this.mBuffer.setNumberOfPartitions(nPartitions);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Reset number of partitions to " + nPartitions + " for " + this.runtimeId);
+        }
+        if (mBuffer != null) {
+            mBuffer.reset();
+        }
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public IFeedManager getFeedManager() {
+        return feedManager;
+    }
+
+    public MonitoredBuffer getmBuffer() {
+        return mBuffer;
+    }
+
+    public boolean isThrottlingEnabled() {
+        return throttlingEnabled;
+    }
+
+    public void setThrottlingEnabled(boolean throttlingEnabled) {
+        if (this.throttlingEnabled != throttlingEnabled) {
+            this.throttlingEnabled = throttlingEnabled;
+            IFeedMessage throttlingEnabledMesg = new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
+            feedManager.getFeedMessageService().sendMessage(throttlingEnabledMesg);
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Throttling " + throttlingEnabled + " for " + this.connectionId + "[" + runtimeId + "]");
+            }
+        }
+    }
+
+    public boolean isBufferingEnabled() {
+        return bufferingEnabled;
+    }
+
+    public void setBufferingEnabled(boolean bufferingEnabled) {
+        this.bufferingEnabled = bufferingEnabled;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java
new file mode 100644
index 0000000..7980712
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameCollection.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
+import org.apache.asterix.external.feed.api.IFeedMemoryManager;
+
+/**
+ * Represents an expandable collection of frames.
+ */
+public class FrameCollection implements IFeedMemoryComponent {
+
+    /** A unique identifier for the feed memory component **/
+    private final int componentId;
+
+    /** A collection of frames (each being a ByteBuffer) **/
+    private final List<ByteBuffer> frames = new LinkedList<ByteBuffer>();
+
+    /** The permitted maximum size, the collection may grow to **/
+    private int maxSize;
+
+    /** The {@link IFeedMemoryManager} for the NodeController **/
+    private final IFeedMemoryManager memoryManager;
+
+    public FrameCollection(int componentId, IFeedMemoryManager memoryManager, int maxSize) {
+        this.componentId = componentId;
+        this.maxSize = maxSize;
+        this.memoryManager = memoryManager;
+    }
+
+    public boolean addFrame(ByteBuffer frame) {
+        if (frames.size() == maxSize) {
+            boolean expansionGranted = memoryManager.expandMemoryComponent(this);
+            if (!expansionGranted) {
+                return false;
+            }
+        }
+        ByteBuffer storageBuffer = ByteBuffer.allocate(frame.capacity());
+        storageBuffer.put(frame);
+        frames.add(storageBuffer);
+        storageBuffer.flip();
+        return true;
+    }
+
+    public Iterator<ByteBuffer> getFrameCollectionIterator() {
+        return frames.iterator();
+    }
+
+    @Override
+    public int getTotalAllocation() {
+        return frames.size();
+    }
+
+    @Override
+    public Type getType() {
+        return Type.COLLECTION;
+    }
+
+    @Override
+    public int getComponentId() {
+        return componentId;
+    }
+
+    @Override
+    public void expand(int delta) {
+        maxSize = maxSize + delta;
+    }
+
+    @Override
+    public void reset() {
+        frames.clear();
+        maxSize = IFeedMemoryManager.START_COLLECTION_SIZE;
+    }
+
+    @Override
+    public String toString() {
+        return "FrameCollection" + "[" + componentId + "]" + "(" + frames.size() + "/" + maxSize + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
new file mode 100644
index 0000000..543efb2
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFeedMemoryManager;
+import org.apache.asterix.external.feed.api.IFeedMemoryComponent.Type;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FrameDistributor {
+
+    private static final Logger LOGGER = Logger.getLogger(FrameDistributor.class.getName());
+
+    private static final long MEMORY_AVAILABLE_POLL_PERIOD = 1000; // 1 second
+
+    private final FeedId feedId;
+    private final FeedRuntimeType feedRuntimeType;
+    private final int partition;
+    private final IFeedMemoryManager memoryManager;
+    private final boolean enableSynchronousTransfer;
+    /** A map storing the registered frame readers ({@code FeedFrameCollector}. **/
+    private final Map<IFrameWriter, FeedFrameCollector> registeredCollectors;
+    private final FrameTupleAccessor fta;
+
+    private DataBucketPool pool;
+    private DistributionMode distributionMode;
+    private boolean spillToDiskRequired = false;
+
+    public enum DistributionMode {
+        /**
+         * A single feed frame collector is registered for receiving tuples.
+         * Tuple is sent via synchronous call, that is no buffering is involved
+         **/
+        SINGLE,
+
+        /**
+         * Multiple feed frame collectors are concurrently registered for
+         * receiving tuples.
+         **/
+        SHARED,
+
+        /**
+         * Feed tuples are not being processed, irrespective of # of registered
+         * feed frame collectors.
+         **/
+        INACTIVE
+    }
+
+    public FrameDistributor(FeedId feedId, FeedRuntimeType feedRuntimeType, int partition,
+            boolean enableSynchronousTransfer, IFeedMemoryManager memoryManager, FrameTupleAccessor fta)
+                    throws HyracksDataException {
+        this.feedId = feedId;
+        this.feedRuntimeType = feedRuntimeType;
+        this.partition = partition;
+        this.memoryManager = memoryManager;
+        this.enableSynchronousTransfer = enableSynchronousTransfer;
+        this.registeredCollectors = new HashMap<IFrameWriter, FeedFrameCollector>();
+        this.distributionMode = DistributionMode.INACTIVE;
+        this.fta = fta;
+    }
+
+    public void notifyEndOfFeed() {
+        DataBucket bucket = getDataBucket();
+        if (bucket != null) {
+            sendEndOfFeedDataBucket(bucket);
+        } else {
+            while (bucket == null) {
+                try {
+                    Thread.sleep(MEMORY_AVAILABLE_POLL_PERIOD);
+                    bucket = getDataBucket();
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+            if (bucket != null) {
+                sendEndOfFeedDataBucket(bucket);
+            }
+        }
+    }
+
+    private void sendEndOfFeedDataBucket(DataBucket bucket) {
+        bucket.setContentType(DataBucket.ContentType.EOD);
+        nextBucket(bucket);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("End of feed data packet sent " + this.feedId);
+        }
+    }
+
+    public synchronized void registerFrameCollector(FeedFrameCollector frameCollector) {
+        DistributionMode currentMode = distributionMode;
+        switch (distributionMode) {
+            case INACTIVE:
+                if (!enableSynchronousTransfer) {
+                    pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
+                    frameCollector.start();
+                }
+                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+                setMode(DistributionMode.SINGLE);
+                break;
+            case SINGLE:
+                pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
+                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+                for (FeedFrameCollector reader : registeredCollectors.values()) {
+                    reader.start();
+                }
+                setMode(DistributionMode.SHARED);
+                break;
+            case SHARED:
+                frameCollector.start();
+                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
+                break;
+        }
+        evaluateIfSpillIsEnabled();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(
+                    "Switching to " + distributionMode + " mode from " + currentMode + " mode " + " Feed id " + feedId);
+        }
+    }
+
+    public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector) {
+        switch (distributionMode) {
+            case INACTIVE:
+                throw new IllegalStateException(
+                        "Invalid attempt to deregister frame collector in " + distributionMode + " mode.");
+            case SHARED:
+                frameCollector.closeCollector();
+                registeredCollectors.remove(frameCollector.getFrameWriter());
+                int nCollectors = registeredCollectors.size();
+                if (nCollectors == 1) {
+                    FeedFrameCollector loneCollector = registeredCollectors.values().iterator().next();
+                    setMode(DistributionMode.SINGLE);
+                    loneCollector.setState(FeedFrameCollector.State.TRANSITION);
+                    loneCollector.closeCollector();
+                    memoryManager.releaseMemoryComponent(pool);
+                    evaluateIfSpillIsEnabled();
+                } else {
+                    if (!spillToDiskRequired) {
+                        evaluateIfSpillIsEnabled();
+                    }
+                }
+                break;
+            case SINGLE:
+                frameCollector.closeCollector();
+                setMode(DistributionMode.INACTIVE);
+                spillToDiskRequired = false;
+                break;
+
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Deregistered frame reader" + frameCollector + " from feed distributor for " + feedId);
+        }
+    }
+
+    public void evaluateIfSpillIsEnabled() {
+        if (!spillToDiskRequired) {
+            for (FeedFrameCollector collector : registeredCollectors.values()) {
+                spillToDiskRequired = spillToDiskRequired
+                        || collector.getFeedPolicyAccessor().spillToDiskOnCongestion();
+                if (spillToDiskRequired) {
+                    break;
+                }
+            }
+        }
+    }
+
+    public boolean deregisterFrameCollector(IFrameWriter frameWriter) {
+        FeedFrameCollector collector = registeredCollectors.get(frameWriter);
+        if (collector != null) {
+            deregisterFrameCollector(collector);
+            return true;
+        }
+        return false;
+    }
+
+    public synchronized void setMode(DistributionMode mode) {
+        this.distributionMode = mode;
+    }
+
+    public boolean isRegistered(IFrameWriter writer) {
+        return registeredCollectors.get(writer) != null;
+    }
+
+    public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+        switch (distributionMode) {
+            case INACTIVE:
+                break;
+            case SINGLE:
+                FeedFrameCollector collector = registeredCollectors.values().iterator().next();
+                switch (collector.getState()) {
+                    case HANDOVER:
+                    case ACTIVE:
+                        if (enableSynchronousTransfer) {
+                            collector.nextFrame(frame); // processing is synchronous
+                        } else {
+                            handleDataBucket(frame);
+                        }
+                        break;
+                    case TRANSITION:
+                        handleDataBucket(frame);
+                        break;
+                    case FINISHED:
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Discarding fetched tuples, feed has ended [" + registeredCollectors.get(0)
+                                    + "]" + " Feed Id " + feedId + " frame distributor " + this.getFeedRuntimeType());
+                        }
+                        registeredCollectors.remove(0);
+                        break;
+                }
+                break;
+            case SHARED:
+                handleDataBucket(frame);
+                break;
+        }
+    }
+
+    private void nextBucket(DataBucket bucket) {
+        for (FeedFrameCollector collector : registeredCollectors.values()) {
+            collector.sendMessage(bucket); // asynchronous call
+        }
+    }
+
+    private void handleDataBucket(ByteBuffer frame) throws HyracksDataException {
+        DataBucket bucket = getDataBucket();
+        if (bucket == null) {
+            handleFrameDuringMemoryCongestion(frame);
+        } else {
+            bucket.reset(frame);
+            bucket.setDesiredReadCount(registeredCollectors.size());
+            nextBucket(bucket);
+        }
+    }
+
+    private void handleFrameDuringMemoryCongestion(ByteBuffer frame) throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("Unable to allocate memory, will evaluate the need to spill");
+        }
+        // wait till memory is available
+    }
+
+    private DataBucket getDataBucket() {
+        DataBucket bucket = null;
+        if (pool != null) {
+            bucket = pool.getDataBucket();
+            if (bucket != null) {
+                bucket.setDesiredReadCount(registeredCollectors.size());
+                return bucket;
+            } else {
+                return null;
+            }
+        }
+        return null;
+    }
+
+    public DistributionMode getMode() {
+        return distributionMode;
+    }
+
+    public void close() {
+        switch (distributionMode) {
+            case INACTIVE:
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("FrameDistributor is " + distributionMode);
+                }
+                break;
+            case SINGLE:
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Disconnecting single frame reader in " + distributionMode + " mode " + " for  feedId "
+                            + feedId + " " + this.feedRuntimeType);
+                }
+                setMode(DistributionMode.INACTIVE);
+                if (!enableSynchronousTransfer) {
+                    notifyEndOfFeed(); // send EOD Data Bucket
+                    waitForCollectorsToFinish();
+                }
+                registeredCollectors.values().iterator().next().disconnect();
+                break;
+            case SHARED:
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Signalling End Of Feed; currently operating in " + distributionMode + " mode");
+                }
+                notifyEndOfFeed(); // send EOD Data Bucket
+                waitForCollectorsToFinish();
+                break;
+        }
+    }
+
+    private void waitForCollectorsToFinish() {
+        synchronized (registeredCollectors.values()) {
+            while (!allCollectorsFinished()) {
+                try {
+                    registeredCollectors.values().wait();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private boolean allCollectorsFinished() {
+        boolean allFinished = true;
+        for (FeedFrameCollector collector : registeredCollectors.values()) {
+            allFinished = allFinished && collector.getState().equals(FeedFrameCollector.State.FINISHED);
+        }
+        return allFinished;
+    }
+
+    public Collection<FeedFrameCollector> getRegisteredCollectors() {
+        return registeredCollectors.values();
+    }
+
+    public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
+        return registeredCollectors;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    public DistributionMode getDistributionMode() {
+        return distributionMode;
+    }
+
+    public FeedRuntimeType getFeedRuntimeType() {
+        return feedRuntimeType;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public FrameTupleAccessor getFta() {
+        return fta;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java
new file mode 100644
index 0000000..ba67862
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameEventCallback.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFrameEventCallback;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FrameEventCallback implements IFrameEventCallback {
+
+    private static final Logger LOGGER = Logger.getLogger(FrameEventCallback.class.getName());
+
+    private final FeedPolicyAccessor fpa;
+    private final FeedRuntimeInputHandler inputSideHandler;
+    private IFrameWriter coreOperator;
+
+    public FrameEventCallback(FeedPolicyAccessor fpa, FeedRuntimeInputHandler inputSideHandler,
+            IFrameWriter coreOperator) {
+        this.fpa = fpa;
+        this.inputSideHandler = inputSideHandler;
+        this.coreOperator = coreOperator;
+    }
+
+    @Override
+    public void frameEvent(FrameEvent event) {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Frame Event for " + inputSideHandler.getRuntimeId() + " " + event);
+        }
+        if (!event.equals(FrameEvent.FINISHED_PROCESSING_SPILLAGE)
+                && inputSideHandler.getMode().equals(Mode.PROCESS_SPILL)) {
+            return;
+        }
+        switch (event) {
+            case PENDING_WORK_THRESHOLD_REACHED:
+                if (fpa.spillToDiskOnCongestion()) {
+                    inputSideHandler.setMode(Mode.SPILL);
+                } else if (fpa.discardOnCongestion()) {
+                    inputSideHandler.setMode(Mode.DISCARD);
+                } else if (fpa.throttlingEnabled()) {
+                    inputSideHandler.setThrottlingEnabled(true);
+                } else {
+                    try {
+                        inputSideHandler.reportUnresolvableCongestion();
+                    } catch (HyracksDataException e) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to report congestion!!!");
+                        }
+                    }
+                }
+                break;
+            case FINISHED_PROCESSING:
+                inputSideHandler.setFinished(true);
+                synchronized (coreOperator) {
+                    coreOperator.notifyAll();
+                }
+                break;
+            case PENDING_WORK_DONE:
+                switch (inputSideHandler.getMode()) {
+                    case SPILL:
+                    case DISCARD:
+                    case POST_SPILL_DISCARD:
+                        inputSideHandler.setMode(Mode.PROCESS);
+                        break;
+                    default:
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Received " + event + " ignoring as operating in " + inputSideHandler.getMode());
+                        }
+                }
+                break;
+            case FINISHED_PROCESSING_SPILLAGE:
+                inputSideHandler.setMode(Mode.PROCESS);
+                break;
+            default:
+                break;
+        }
+    }
+
+    public void setCoreOperator(IFrameWriter coreOperator) {
+        this.coreOperator = coreOperator;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
new file mode 100644
index 0000000..22dcfac
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.asterix.external.feed.watch.IntakePartitionStatistics;
+import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class StorageFrameHandler {
+
+    private final Map<Integer, Map<Integer, IntakePartitionStatistics>> intakeStatistics;
+    private long avgDelayPersistence;
+
+    public StorageFrameHandler() {
+        intakeStatistics = new HashMap<Integer, Map<Integer, IntakePartitionStatistics>>();
+        avgDelayPersistence = 0L;
+    }
+
+    public synchronized void updateTrackingInformation(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
+        int nTuples = frameAccessor.getTupleCount();
+        long delay = 0;
+        long intakeTimestamp;
+        long currentTime = System.currentTimeMillis();
+        int partition = 0;
+        int recordId = 0;
+        for (int i = 0; i < nTuples; i++) {
+            int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
+            int openPartOffsetOrig = frame.getInt(recordStart + 6);
+            int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
+
+            int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
+                    + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
+            recordId = frame.getInt(recordStart + recordIdOffset);
+
+            int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2) + 1;
+            partition = frame.getInt(recordStart + partitionOffset);
+
+            ackRecordId(partition, recordId);
+            int intakeTimestampValueOffset = partitionOffset + 4 + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2)
+                    + 1;
+            intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
+
+            int storeTimestampValueOffset = intakeTimestampValueOffset + 8
+                    + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
+            frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
+            delay += currentTime - intakeTimestamp;
+        }
+        avgDelayPersistence = delay / nTuples;
+    }
+
+    private void ackRecordId(int partition, int recordId) {
+        Map<Integer, IntakePartitionStatistics> map = intakeStatistics.get(partition);
+        if (map == null) {
+            map = new HashMap<Integer, IntakePartitionStatistics>();
+            intakeStatistics.put(partition, map);
+        }
+        int base = (int) Math.ceil(recordId * 1.0 / IntakePartitionStatistics.ACK_WINDOW_SIZE);
+        IntakePartitionStatistics intakeStatsForBaseOfPartition = map.get(base);
+        if (intakeStatsForBaseOfPartition == null) {
+            intakeStatsForBaseOfPartition = new IntakePartitionStatistics(partition, base);
+            map.put(base, intakeStatsForBaseOfPartition);
+        }
+        intakeStatsForBaseOfPartition.ackRecordId(recordId);
+    }
+
+    public byte[] getAckData(int partition, int base) {
+        Map<Integer, IntakePartitionStatistics> intakeStats = intakeStatistics.get(partition);
+        if (intakeStats != null) {
+            IntakePartitionStatistics intakePartitionStats = intakeStats.get(base);
+            if (intakePartitionStats != null) {
+                return intakePartitionStats.getAckInfo();
+            }
+        }
+        return null;
+    }
+
+    public synchronized Map<Integer, IntakePartitionStatistics> getBaseAcksForPartition(int partition) {
+        Map<Integer, IntakePartitionStatistics> intakeStatsForPartition = intakeStatistics.get(partition);
+        Map<Integer, IntakePartitionStatistics> clone = new HashMap<Integer, IntakePartitionStatistics>();
+        for (Entry<Integer, IntakePartitionStatistics> entry : intakeStatsForPartition.entrySet()) {
+            clone.put(entry.getKey(), entry.getValue());
+        }
+        return intakeStatsForPartition;
+    }
+
+    public long getAvgDelayPersistence() {
+        return avgDelayPersistence;
+    }
+
+    public void setAvgDelayPersistence(long avgDelayPersistence) {
+        this.avgDelayPersistence = avgDelayPersistence;
+    }
+
+    public Set<Integer> getPartitionsWithStats() {
+        return intakeStatistics.keySet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
new file mode 100644
index 0000000..9f861d4
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedCollectInfo extends FeedInfo {
+    public FeedId sourceFeedId;
+    public FeedConnectionId feedConnectionId;
+    public List<String> collectLocations = new ArrayList<String>();
+    public List<String> computeLocations = new ArrayList<String>();
+    public List<String> storageLocations = new ArrayList<String>();
+    public Map<String, String> feedPolicy;
+    public String superFeedManagerHost;
+    public int superFeedManagerPort;
+    public boolean fullyConnected;
+
+    public FeedCollectInfo(FeedId sourceFeedId, FeedConnectionId feedConnectionId, JobSpecification jobSpec,
+            JobId jobId, Map<String, String> feedPolicy) {
+        super(jobSpec, jobId, FeedInfoType.COLLECT);
+        this.sourceFeedId = sourceFeedId;
+        this.feedConnectionId = feedConnectionId;
+        this.feedPolicy = feedPolicy;
+        this.fullyConnected = true;
+    }
+
+    @Override
+    public String toString() {
+        return FeedInfoType.COLLECT + "[" + feedConnectionId + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
new file mode 100644
index 0000000..1af7153
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.io.Serializable;
+
+/**
+ * A unique identifier for a feed connection. A feed connection is an instance of a data feed that is flowing into a
+ * dataset.
+ */
+public class FeedConnectionId implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedId feedId;            // Dataverse - Feed
+    private final String datasetName;       // Dataset
+
+    public FeedConnectionId(FeedId feedId, String datasetName) {
+        this.feedId = feedId;
+        this.datasetName = datasetName;
+    }
+
+    public FeedConnectionId(String dataverse, String feedName, String datasetName) {
+        this.feedId = new FeedId(dataverse, feedName);
+        this.datasetName = datasetName;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    public String getDatasetName() {
+        return datasetName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !(o instanceof FeedConnectionId)) {
+            return false;
+        }
+
+        if (this == o || (((FeedConnectionId) o).getFeedId().equals(feedId)
+                && ((FeedConnectionId) o).getDatasetName().equals(datasetName))) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return feedId.toString() + "-->" + datasetName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
new file mode 100644
index 0000000..dd2fc60
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFeedConnectionManager;
+import org.apache.asterix.external.feed.runtime.FeedRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+
+/**
+ * An implementation of the IFeedManager interface.
+ * Provider necessary central repository for registering/retrieving
+ * artifacts/services associated with a feed.
+ */
+public class FeedConnectionManager implements IFeedConnectionManager {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedConnectionManager.class.getName());
+
+    private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
+    private final String nodeId;
+
+    public FeedConnectionManager(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
+        return feedRuntimeManagers.get(feedId);
+    }
+
+    @Override
+    public void deregisterFeed(FeedConnectionId feedId) {
+        try {
+            FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
+            if (mgr != null) {
+                mgr.close();
+                feedRuntimeManagers.remove(feedId);
+            }
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
+            }
+        }
+
+    }
+
+    @Override
+    public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime)
+            throws Exception {
+        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+        if (runtimeMgr == null) {
+            runtimeMgr = new FeedRuntimeManager(connectionId, this);
+            feedRuntimeManagers.put(connectionId, runtimeMgr);
+        }
+        runtimeMgr.registerFeedRuntime(feedRuntime.getRuntimeId(), feedRuntime);
+    }
+
+    @Override
+    public void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
+        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+        if (runtimeMgr != null) {
+            runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
+        }
+    }
+
+    @Override
+    public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
+        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
+        return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
+    }
+
+    @Override
+    public String toString() {
+        return "FeedManager " + "[" + nodeId + "]";
+    }
+
+    @Override
+    public List<FeedRuntimeId> getRegisteredRuntimes() {
+        List<FeedRuntimeId> runtimes = new ArrayList<FeedRuntimeId>();
+        for (Entry<FeedConnectionId, FeedRuntimeManager> entry : feedRuntimeManagers.entrySet()) {
+            runtimes.addAll(entry.getValue().getFeedRuntimes());
+        }
+        return runtimes;
+    }
+}