You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:12 UTC

[15/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameTupleAccessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameTupleAccessor.java
new file mode 100644
index 0000000..9567307
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameTupleAccessor.java
@@ -0,0 +1,92 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.common.feeds.FeedConstants.StatisticsConstants;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FeedFrameTupleAccessor implements IFrameTupleAccessor {
+
+    private final FrameTupleAccessor frameAccessor;
+    private final int numOpenFields;
+
+    public FeedFrameTupleAccessor(FrameTupleAccessor frameAccessor) {
+        this.frameAccessor = frameAccessor;
+        int firstRecordStart = frameAccessor.getTupleStartOffset(0) + frameAccessor.getFieldSlotsLength();
+        int openPartOffsetOrig = frameAccessor.getBuffer().getInt(firstRecordStart + 6);
+        numOpenFields = frameAccessor.getBuffer().getInt(firstRecordStart + openPartOffsetOrig);
+    }
+
+    public int getFeedIntakePartition(int tupleIndex) {
+        ByteBuffer buffer = frameAccessor.getBuffer();
+        int recordStart = frameAccessor.getTupleStartOffset(tupleIndex) + frameAccessor.getFieldSlotsLength();
+        int openPartOffsetOrig = buffer.getInt(recordStart + 6);
+        int partitionOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
+                + StatisticsConstants.INTAKE_PARTITION.length() + 2 + 1;
+        return buffer.getInt(recordStart + partitionOffset);
+    }
+    
+    
+
+    @Override
+    public int getFieldCount() {
+        return frameAccessor.getFieldCount();
+    }
+
+    @Override
+    public int getFieldSlotsLength() {
+        return frameAccessor.getFieldSlotsLength();
+    }
+
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        return frameAccessor.getFieldEndOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        return frameAccessor.getFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getFieldLength(int tupleIndex, int fIdx) {
+        return frameAccessor.getFieldLength(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        return frameAccessor.getTupleEndOffset(tupleIndex);
+    }
+
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        return frameAccessor.getTupleStartOffset(tupleIndex);
+    }
+
+    @Override
+    public int getTupleCount() {
+        return frameAccessor.getTupleCount();
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return frameAccessor.getBuffer();
+    }
+
+    @Override
+    public void reset(ByteBuffer buffer) {
+        frameAccessor.reset(buffer);
+    }
+
+    @Override
+    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
+        return frameAccessor.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
+    }
+
+    @Override
+    public int getTupleLength(int tupleIndex) {
+        return frameAccessor.getTupleLength(tupleIndex);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameUtil.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameUtil.java
new file mode 100644
index 0000000..927d10d
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedFrameUtil.java
@@ -0,0 +1,84 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class FeedFrameUtil {
+
+    public static ByteBuffer getSlicedFrame(IHyracksTaskContext ctx, int tupleIndex, FrameTupleAccessor fta) throws HyracksDataException {
+        FrameTupleAppender appender = new FrameTupleAppender();
+        IFrame slicedFrame = new VSizeFrame(ctx);
+        appender.reset(slicedFrame, true);
+        int startTupleIndex = tupleIndex + 1;
+        int totalTuples = fta.getTupleCount();
+        for (int ti = startTupleIndex; ti < totalTuples; ti++) {
+            appender.append(fta, ti);
+        }
+        return slicedFrame.getBuffer();
+    }
+
+    public static ByteBuffer getSampledFrame(IHyracksTaskContext ctx, FrameTupleAccessor fta, int sampleSize) throws HyracksDataException {
+        NChooseKIterator it = new NChooseKIterator(fta.getTupleCount(), sampleSize);
+        FrameTupleAppender appender = new FrameTupleAppender();
+        IFrame sampledFrame = new VSizeFrame(ctx);
+        appender.reset(sampledFrame, true);
+        int nextTupleIndex = 0;
+        while (it.hasNext()) {
+            nextTupleIndex = it.next();
+            appender.append(fta, nextTupleIndex);
+        }
+        return sampledFrame.getBuffer();
+    }
+    
+  
+
+    private static class NChooseKIterator {
+
+        private final int n;
+        private final int k;
+        private final BitSet bSet;
+        private final Random random;
+
+        private int traversed = 0;
+
+        public NChooseKIterator(int n, int k) {
+            this.n = n;
+            this.k = k;
+            this.bSet = new BitSet(n);
+            bSet.set(0, n - 1);
+            this.random = new Random();
+        }
+
+        public boolean hasNext() {
+            return traversed < k;
+        }
+
+        public int next() {
+            if (hasNext()) {
+                traversed++;
+                int startOffset = random.nextInt(n);
+                int pos = -1;
+                while (pos < 0) {
+                    pos = bSet.nextSetBit(startOffset);
+                    if (pos < 0) {
+                        startOffset = 0;
+                    }
+                }
+                bSet.clear(pos);
+                return pos;
+            } else {
+                return -1;
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedId.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedId.java
new file mode 100644
index 0000000..1e38f70
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedId.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.Serializable;
+
+/**
+ * A unique identifier for a data feed.
+ */
+public class FeedId implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String dataverse;
+    private final String feedName;
+
+    public FeedId(String dataverse, String feedName) {
+        this.dataverse = dataverse;
+        this.feedName = feedName;
+    }
+
+    public String getDataverse() {
+        return dataverse;
+    }
+
+    public String getFeedName() {
+        return feedName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !(o instanceof FeedId)) {
+            return false;
+        }
+        if (this == o || ((FeedId) o).getFeedName().equals(feedName) && ((FeedId) o).getDataverse().equals(dataverse)) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return dataverse + "." + feedName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedIntakeInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedIntakeInfo.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedIntakeInfo.java
new file mode 100644
index 0000000..8ed3cf1
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedIntakeInfo.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedIntakeInfo extends FeedJobInfo {
+
+    private final FeedId feedId;
+    private final IFeedJoint intakeFeedJoint;
+    private final JobSpecification spec;
+    private List<String> intakeLocation;
+
+    public FeedIntakeInfo(JobId jobId, FeedJobState state, JobType jobType, FeedId feedId, IFeedJoint intakeFeedJoint,
+            JobSpecification spec) {
+        super(jobId, state, FeedJobInfo.JobType.INTAKE, spec);
+        this.feedId = feedId;
+        this.intakeFeedJoint = intakeFeedJoint;
+        this.spec = spec;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    public IFeedJoint getIntakeFeedJoint() {
+        return intakeFeedJoint;
+    }
+
+    public JobSpecification getSpec() {
+        return spec;
+    }
+
+    public List<String> getIntakeLocation() {
+        return intakeLocation;
+    }
+
+    public void setIntakeLocation(List<String> intakeLocation) {
+        this.intakeLocation = intakeLocation;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJobInfo.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJobInfo.java
new file mode 100644
index 0000000..c50ca43
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJobInfo.java
@@ -0,0 +1,68 @@
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedJobInfo {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedJobInfo.class.getName());
+
+    public enum JobType {
+        INTAKE,
+        FEED_CONNECT
+    }
+
+    public enum FeedJobState {
+        CREATED,
+        ACTIVE,
+        UNDER_RECOVERY,
+        ENDED
+    }
+
+    protected final JobId jobId;
+    protected final JobType jobType;
+    protected FeedJobState state;
+    protected JobSpecification spec;
+
+    public FeedJobInfo(JobId jobId, FeedJobState state, JobType jobType, JobSpecification spec) {
+        this.jobId = jobId;
+        this.state = state;
+        this.jobType = jobType;
+        this.spec = spec;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public FeedJobState getState() {
+        return state;
+    }
+
+    public void setState(FeedJobState state) {
+        this.state = state;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(this + " is in " + state + " state.");
+        }
+    }
+
+    public JobType getJobType() {
+        return jobType;
+    }
+
+    public JobSpecification getSpec() {
+        return spec;
+    }
+
+    public void setSpec(JobSpecification spec) {
+        this.spec = spec;
+    }
+
+    public String toString() {
+        return jobId + " [" + jobType + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJointKey.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJointKey.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJointKey.java
new file mode 100644
index 0000000..145ac32
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedJointKey.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Represents a unique identifier for a Feed Joint. A Feed joint is a logical entity located
+ * along a feed ingestion pipeline at a point where the tuples moving as part of data flow
+ * constitute the feed. The feed joint acts as a network tap and allows the flowing data to be
+ * routed to multiple paths.
+ */
+public class FeedJointKey {
+
+    private final FeedId primaryFeedId;
+    private final List<String> appliedFunctions;
+    private final String stringRep;
+
+    public FeedJointKey(FeedId feedId, List<String> appliedFunctions) {
+        this.primaryFeedId = feedId;
+        this.appliedFunctions = appliedFunctions;
+        StringBuilder builder = new StringBuilder();
+        builder.append(feedId);
+        builder.append(":");
+        builder.append(StringUtils.join(appliedFunctions, ':'));
+        stringRep = builder.toString();
+    }
+
+    public FeedId getFeedId() {
+        return primaryFeedId;
+    }
+
+    public List<String> getAppliedFunctions() {
+        return appliedFunctions;
+    }
+
+    public String getStringRep() {
+        return stringRep;
+    }
+
+    @Override
+    public final String toString() {
+        return stringRep;
+    }
+
+    @Override
+    public int hashCode() {
+        return stringRep.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || !(o instanceof FeedJointKey)) {
+            return false;
+        }
+        return stringRep.equals(((FeedJointKey) o).stringRep);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMemoryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMemoryManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMemoryManager.java
new file mode 100644
index 0000000..88e900d
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMemoryManager.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent.Type;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+
+public class FeedMemoryManager implements IFeedMemoryManager {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMemoryManager.class.getName());
+    private static final int ALLOCATION_INCREMENT = 10;
+
+    private final AtomicInteger componentId = new AtomicInteger(0);
+    private final String nodeId;
+    private final int budget;
+    private final int frameSize;
+
+    private int committed;
+
+    public FeedMemoryManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize) {
+        this.nodeId = nodeId;
+        this.frameSize = frameSize;
+        budget = (int) feedProperties.getMemoryComponentGlobalBudget() / frameSize;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Feed Memory budget " + budget + " frames (frame size=" + frameSize + ")");
+        }
+    }
+
+    @Override
+    public synchronized IFeedMemoryComponent getMemoryComponent(Type type) {
+        IFeedMemoryComponent memoryComponent = null;
+        boolean valid = false;
+        switch (type) {
+            case COLLECTION:
+                valid = committed + START_COLLECTION_SIZE <= budget;
+                if (valid) {
+                    memoryComponent = new FrameCollection(componentId.incrementAndGet(), this, START_COLLECTION_SIZE);
+                }
+                break;
+            case POOL:
+                valid = committed + START_POOL_SIZE <= budget;
+                if (valid) {
+                    memoryComponent = new DataBucketPool(componentId.incrementAndGet(), this, START_POOL_SIZE,
+                            frameSize);
+                }
+                committed += START_POOL_SIZE;
+                break;
+        }
+        if (!valid) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to allocate memory component of type" + type);
+            }
+        }
+        return valid ? memoryComponent : null;
+    }
+
+    @Override
+    public synchronized boolean expandMemoryComponent(IFeedMemoryComponent memoryComponent) {
+        if (committed + ALLOCATION_INCREMENT > budget) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Memory budget " + budget + " is exhausted. Space left: " + (budget - committed)
+                        + " frames.");
+            }
+            return false;
+        } else {
+            memoryComponent.expand(ALLOCATION_INCREMENT);
+            committed += ALLOCATION_INCREMENT;
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Expanded memory component " + memoryComponent + " by " + ALLOCATION_INCREMENT + " " + this);
+            }
+            return true;
+        }
+    }
+
+    @Override
+    public synchronized void releaseMemoryComponent(IFeedMemoryComponent memoryComponent) {
+        int delta = memoryComponent.getTotalAllocation();
+        committed -= delta;
+        memoryComponent.reset();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Reset " + memoryComponent + " and reclaimed " + delta + " frames " + this);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "FeedMemoryManager  [" + nodeId + "]" + "(" + committed + "/" + budget + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMessageService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMessageService.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMessageService.java
index 3d0d8f9..320e6b9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMessageService.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMessageService.java
@@ -1,149 +1,140 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.asterix.common.feeds;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.net.Socket;
-import java.net.UnknownHostException;
-import java.nio.CharBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessageService;
+
 /**
  * Sends feed report messages on behalf of an operator instance
  * to the SuperFeedManager associated with the feed.
  */
-public class FeedMessageService {
+public class FeedMessageService implements IFeedMessageService {
 
     private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
 
-    public static final char MessageSeparator = '|';
-    private static final char EOL = (char) "\n".getBytes()[0];
-
-    private final FeedConnectionId feedId;
     private final LinkedBlockingQueue<String> inbox;
     private final FeedMessageHandler mesgHandler;
-    private final IFeedManager feedManager;
-
-    public FeedMessageService(FeedConnectionId feedId, IFeedManager feedManager) {
-        this.feedId = feedId;
-        inbox = new LinkedBlockingQueue<String>();
-        mesgHandler = new FeedMessageHandler(inbox, feedId, feedManager);
-        this.feedManager = feedManager;
+    private final String nodeId;
+    private ExecutorService executor;
+
+    public FeedMessageService(AsterixFeedProperties feedProperties, String nodeId, String ccClusterIp) {
+        this.inbox = new LinkedBlockingQueue<String>();
+        this.mesgHandler = new FeedMessageHandler(inbox, ccClusterIp, feedProperties.getFeedCentralManagerPort());
+        this.nodeId = nodeId;
+        this.executor = Executors.newSingleThreadExecutor();
     }
 
-    public void start() throws UnknownHostException, IOException, Exception {
-        feedManager.getFeedExecutorService(feedId).execute(mesgHandler);
+    public void start() throws Exception {
+
+        executor.execute(mesgHandler);
     }
 
-    public void stop() throws IOException {
+    public void stop() {
+        synchronized (mesgHandler.getLock()) {
+            executor.shutdownNow();
+        }
         mesgHandler.stop();
     }
 
-    public void sendMessage(String message) throws IOException {
-        inbox.add(message);
+    @Override
+    public void sendMessage(IFeedMessage message) {
+        try {
+            JSONObject obj = message.toJSON();
+            obj.put(FeedConstants.MessageConstants.NODE_ID, nodeId);
+            obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, message.getMessageType().name());
+            inbox.add(obj.toString());
+        } catch (JSONException jse) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("JSON exception in parsing message " + message + " exception [" + jse.getMessage() + "]");
+            }
+        }
     }
 
     private static class FeedMessageHandler implements Runnable {
 
         private final LinkedBlockingQueue<String> inbox;
-        private final FeedConnectionId feedId;
-        private Socket sfmSocket;
-        private boolean process = true;
-        private final IFeedManager feedManager;
+        private final String host;
+        private final int port;
+        private final Object lock;
 
-        public FeedMessageHandler(LinkedBlockingQueue<String> inbox, FeedConnectionId feedId, IFeedManager feedManager) {
+        private Socket cfmSocket;
+
+        private static final byte[] EOL = "\n".getBytes();
+
+        public FeedMessageHandler(LinkedBlockingQueue<String> inbox, String host, int port) {
             this.inbox = inbox;
-            this.feedId = feedId;
-            this.feedManager = feedManager;
+            this.host = host;
+            this.port = port;
+            this.lock = new Object();
         }
 
         public void run() {
             try {
-                sfmSocket = obtainSFMSocket();
-                if (sfmSocket != null) {
-                    while (process) {
+                cfmSocket = new Socket(host, port);
+                if (cfmSocket != null) {
+                    while (true) {
                         String message = inbox.take();
-                        sfmSocket.getOutputStream().write(message.getBytes());
+                        synchronized (lock) { // lock prevents message handler from sending incomplete message midst shutdown attempt
+                            cfmSocket.getOutputStream().write(message.getBytes());
+                            cfmSocket.getOutputStream().write(EOL);
+                        }
                     }
                 } else {
                     if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Unable to start feed message service for " + feedId);
+                        LOGGER.warning("Unable to start feed message service");
                     }
                 }
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Ended feed message service for " + feedId);
-                }
             } catch (Exception e) {
+                e.printStackTrace();
                 if (LOGGER.isLoggable(Level.WARNING)) {
                     LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
                 }
             } finally {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Stopping feed message handler");
-                }
-                if (sfmSocket != null) {
-                    try {
-                        sfmSocket.close();
-                    } catch (Exception e) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Exception in closing socket " + e.getMessage());
-                        }
-                    }
-                }
+                stop();
             }
 
         }
 
         public void stop() {
-            process = false;
-        }
-
-        private Socket obtainSFMSocket() throws UnknownHostException, IOException, Exception {
-            Socket sfmDirServiceSocket = null;
-            SuperFeedManager sfm = feedManager.getSuperFeedManager(feedId);
-            try {
-                FeedRuntimeManager runtimeManager = feedManager.getFeedRuntimeManager(feedId);
-                sfmDirServiceSocket = runtimeManager.createClientSocket(sfm.getHost(), sfm.getPort(),
-                        IFeedManager.SOCKET_CONNECT_TIMEOUT);
-                if (sfmDirServiceSocket == null) {
+            if (cfmSocket != null) {
+                try {
+                    cfmSocket.close();
+                } catch (Exception e) {
                     if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Unable to connect to " + sfm.getHost() + "[" + sfm.getPort() + "]");
-                    }
-                } else {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info(" Connected to Super Feed Manager service " + sfm.getHost() + " " + sfm.getPort());
-                    }
-                    while (!sfmDirServiceSocket.isConnected()) {
-                        Thread.sleep(2000);
-                    }
-                    InputStream in = sfmDirServiceSocket.getInputStream();
-                    CharBuffer buffer = CharBuffer.allocate(50);
-                    char ch = 0;
-                    while (ch != EOL) {
-                        buffer.put(ch);
-                        ch = (char) in.read();
+                        LOGGER.warning("Exception in closing socket " + e.getMessage());
                     }
-                    buffer.flip();
-                    String s = new String(buffer.array());
-                    int port = Integer.parseInt(s.trim());
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Response from Super Feed Manager service " + port + " will connect at "
-                                + sfm.getHost() + " " + port);
-                    }
-                    sfmSocket = runtimeManager.createClientSocket(sfm.getHost(), port,
-                            IFeedManager.SOCKET_CONNECT_TIMEOUT);
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                throw e;
-            } finally {
-                if (sfmDirServiceSocket != null) {
-                    sfmDirServiceSocket.close();
                 }
             }
-            return sfmSocket;
         }
+
+        public Object getLock() {
+            return lock;
+        }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMetricCollector.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMetricCollector.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMetricCollector.java
new file mode 100644
index 0000000..ff943e1
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedMetricCollector.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+
+public class FeedMetricCollector implements IFeedMetricCollector {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMetricCollector.class.getName());
+
+    private static final int UNKNOWN = -1;
+
+    private final String nodeId;
+    private final AtomicInteger globalSenderId = new AtomicInteger(1);
+    private final Map<Integer, Sender> senders = new HashMap<Integer, Sender>();
+    private final Map<Integer, Series> statHistory = new HashMap<Integer, Series>();
+    private final Map<String, Sender> sendersByName = new HashMap<String, Sender>();
+
+    public FeedMetricCollector(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public synchronized int createReportSender(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+            ValueType valueType, MetricType metricType) {
+        Sender sender = new Sender(globalSenderId.getAndIncrement(), connectionId, runtimeId, valueType, metricType);
+        senders.put(sender.senderId, sender);
+        sendersByName.put(sender.getDisplayName(), sender);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Sender id " + sender.getSenderId() + " created for " + sender);
+        }
+        return sender.senderId;
+    }
+
+    @Override
+    public void removeReportSender(int senderId) {
+        Sender sender = senders.get(senderId);
+        if (sender != null) {
+            statHistory.remove(senderId);
+            senders.remove(senderId);
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to remove sender Id");
+            }
+            throw new IllegalStateException("Unable to remove sender Id " + senderId + " senders " + senders);
+        }
+    }
+
+    @Override
+    public boolean sendReport(int senderId, int value) {
+        Sender sender = senders.get(senderId);
+        if (sender != null) {
+            Series series = statHistory.get(sender.senderId);
+            if (series == null) {
+                switch (sender.mType) {
+                    case AVG:
+                        series = new SeriesAvg();
+                        break;
+                    case RATE:
+                        series = new SeriesRate();
+                        break;
+                }
+                statHistory.put(sender.senderId, series);
+            }
+            series.addValue(value);
+            return true;
+        }
+        throw new IllegalStateException("Unable to send report sender Id " + senderId + " senders " + senders);
+    }
+
+    @Override
+    public void resetReportSender(int senderId) {
+        Sender sender = senders.get(senderId);
+        if (sender != null) {
+            Series series = statHistory.get(sender.senderId);
+            if (series != null) {
+                series.reset();
+            }
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Sender with id " + senderId + " not found. Unable to reset!");
+            }
+            throw new IllegalStateException("Unable to reset sender Id " + senderId + " senders " + senders);
+        }
+    }
+
+    private static class Sender {
+
+        private final int senderId;
+        private final MetricType mType;
+        private final String displayName;
+
+        public Sender(int senderId, FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType,
+                MetricType mType) {
+            this.senderId = senderId;
+            this.mType = mType;
+            this.displayName = createDisplayName(connectionId, runtimeId, valueType);
+        }
+
+        @Override
+        public String toString() {
+            return displayName + "[" + senderId + "]" + "(" + mType + ")";
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof Sender)) {
+                return false;
+            }
+            return ((Sender) o).senderId == senderId;
+        }
+
+        @Override
+        public int hashCode() {
+            return senderId;
+        }
+
+        public static String createDisplayName(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+                ValueType valueType) {
+            return connectionId + " (" + runtimeId.getFeedRuntimeType() + " )" + "[" + runtimeId.getPartition() + "]"
+                    + "{" + valueType + "}";
+        }
+
+        public String getDisplayName() {
+            return displayName;
+        }
+
+        public int getSenderId() {
+            return senderId;
+        }
+    }
+
+    @Override
+    public int getMetric(int senderId) {
+        Sender sender = senders.get(senderId);
+        return getMetric(sender);
+    }
+
+    @Override
+    public int getMetric(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType) {
+        String displayName = Sender.createDisplayName(connectionId, runtimeId, valueType);
+        Sender sender = sendersByName.get(displayName);
+        return getMetric(sender);
+    }
+
+    private int getMetric(Sender sender) {
+        if (sender == null || statHistory.get(sender.getSenderId()) == null) {
+            return UNKNOWN;
+        }
+
+        float result = -1;
+        Series series = statHistory.get(sender.getSenderId());
+        switch (sender.mType) {
+            case AVG:
+                result = ((SeriesAvg) series).getAvg();
+                break;
+            case RATE:
+                result = ((SeriesRate) series).getRate();
+                break;
+        }
+        return (int) result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedPolicyAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedPolicyAccessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedPolicyAccessor.java
new file mode 100644
index 0000000..6e6ce75
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedPolicyAccessor.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * A utility class to access the configuration parameters of a feed ingestion policy.
+ */
+public class FeedPolicyAccessor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** failure configuration **/
+    /** continue feed ingestion after a soft (runtime) failure **/
+    public static final String SOFT_FAILURE_CONTINUE = "soft.failure.continue";
+
+    /** log failed tuple to an asterixdb dataset for future reference **/
+    public static final String SOFT_FAILURE_LOG_DATA = "soft.failure.log.data";
+
+    /** continue feed ingestion after loss of one or more machines (hardware failure) **/
+    public static final String HARDWARE_FAILURE_CONTINUE = "hardware.failure.continue";
+
+    /** auto-start a loser feed when the asterixdb instance is restarted **/
+    public static final String CLUSTER_REBOOT_AUTO_RESTART = "cluster.reboot.auto.restart";
+
+    /** framework provides guarantee that each received feed record will be processed through the ingestion pipeline at least once **/
+    public static final String AT_LEAST_ONE_SEMANTICS = "atleast.once.semantics";
+
+    /** flow control configuration **/
+    /** spill excess tuples to disk if an operator cannot process incoming data at its arrival rate **/
+    public static final String SPILL_TO_DISK_ON_CONGESTION = "spill.to.disk.on.congestion";
+
+    /** the maximum size of data (tuples) that can be spilled to disk **/
+    public static final String MAX_SPILL_SIZE_ON_DISK = "max.spill.size.on.disk";
+
+    /** discard tuples altogether if an operator cannot process incoming data at its arrival rate **/
+    public static final String DISCARD_ON_CONGESTION = "discard.on.congestion";
+
+    /** maximum fraction of ingested data that can be discarded **/
+    public static final String MAX_FRACTION_DISCARD = "max.fraction.discard";
+
+    /** maximum end-to-end delay/latency in persisting a tuple through the feed ingestion pipeline **/
+    public static final String MAX_DELAY_RECORD_PERSISTENCE = "max.delay.record.persistence";
+
+    /** rate limit the inflow of tuples in accordance with the maximum capacity of the pipeline **/
+    public static final String THROTTLING_ENABLED = "throttling.enabled";
+
+    /** elasticity **/
+    public static final String ELASTIC = "elastic";
+
+    /** statistics **/
+    public static final String TIME_TRACKING = "time.tracking";
+
+    /** logging of statistics **/
+    public static final String LOGGING_STATISTICS = "logging.statistics";
+
+    public static final long NO_LIMIT = -1;
+
+    private Map<String, String> feedPolicy;
+
+    public Map<String, String> getFeedPolicy() {
+        return feedPolicy;
+    }
+
+    public FeedPolicyAccessor(Map<String, String> feedPolicy) {
+        this.feedPolicy = feedPolicy;
+    }
+
+    public void reset(Map<String, String> feedPolicy) {
+        this.feedPolicy = feedPolicy;
+    }
+
+    /** Failure recover/reporting **/
+
+    public boolean logDataOnSoftFailure() {
+        return getBooleanPropertyValue(SOFT_FAILURE_LOG_DATA, false);
+    }
+
+    public boolean continueOnSoftFailure() {
+        return getBooleanPropertyValue(SOFT_FAILURE_CONTINUE, false);
+    }
+
+    public boolean continueOnHardwareFailure() {
+        return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE, false);
+    }
+
+    public boolean autoRestartOnClusterReboot() {
+        return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART, false);
+    }
+
+    public boolean atleastOnceSemantics() {
+        return getBooleanPropertyValue(AT_LEAST_ONE_SEMANTICS, false);
+    }
+
+    /** flow control **/
+    public boolean spillToDiskOnCongestion() {
+        return getBooleanPropertyValue(SPILL_TO_DISK_ON_CONGESTION, false);
+    }
+
+    public boolean discardOnCongestion() {
+        return getMaxFractionDiscard() > 0;
+    }
+
+    public boolean throttlingEnabled() {
+        return getBooleanPropertyValue(THROTTLING_ENABLED, false);
+    }
+
+    public long getMaxSpillOnDisk() {
+        return getLongPropertyValue(MAX_SPILL_SIZE_ON_DISK, NO_LIMIT);
+    }
+
+    public float getMaxFractionDiscard() {
+        return getFloatPropertyValue(MAX_FRACTION_DISCARD, 0);
+    }
+
+    public long getMaxDelayRecordPersistence() {
+        return getLongPropertyValue(MAX_DELAY_RECORD_PERSISTENCE, Long.MAX_VALUE);
+    }
+
+    /** Elasticity **/
+    public boolean isElastic() {
+        return getBooleanPropertyValue(ELASTIC, false);
+    }
+
+    /** Statistics **/
+    public boolean isTimeTrackingEnabled() {
+        return getBooleanPropertyValue(TIME_TRACKING, false);
+    }
+
+    /** Logging of statistics **/
+    public boolean isLoggingStatisticsEnabled() {
+        return getBooleanPropertyValue(LOGGING_STATISTICS, false);
+    }
+
+    private boolean getBooleanPropertyValue(String key, boolean defValue) {
+        String v = feedPolicy.get(key);
+        return v == null ? false : Boolean.valueOf(v);
+    }
+
+    private long getLongPropertyValue(String key, long defValue) {
+        String v = feedPolicy.get(key);
+        return v != null ? Long.parseLong(v) : defValue;
+    }
+
+    private float getFloatPropertyValue(String key, float defValue) {
+        String v = feedPolicy.get(key);
+        return v != null ? Float.parseFloat(v) : defValue;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedReport.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedReport.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedReport.java
deleted file mode 100644
index cda56ae..0000000
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedReport.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package edu.uci.ics.asterix.common.feeds;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager.FeedReportMessageType;
-
-public class FeedReport implements Comparable {
-
-    private FeedConnectionId feedId;
-    private FeedReportMessageType reportType;
-    private int partition = -1;
-    private FeedRuntimeType runtimeType;
-    private long value = -1;
-    private String[] representation;
-
-    public FeedReport() {
-    }
-
-    public FeedReport(String message) {
-        representation = message.split("\\|");
-    }
-
-    public void reset(String message) {
-        representation = message.split("\\|");
-        reportType = null;
-        feedId = null;
-        runtimeType = null;
-        partition = -1;
-        value = -1;
-    }
-
-    @Override
-    public String toString() {
-        return getFeedId() + " " + getReportType() + " " + getPartition() + " " + getRuntimeType() + " " + getValue();
-    }
-
-    public FeedConnectionId getFeedId() {
-        if (feedId == null) {
-            String feedIdRep = representation[1];
-            String[] feedIdComp = feedIdRep.split(":");
-            feedId = new FeedConnectionId(feedIdComp[0], feedIdComp[1], feedIdComp[2]);
-        }
-        return feedId;
-    }
-
-    public FeedReportMessageType getReportType() {
-        if (reportType == null) {
-            reportType = FeedReportMessageType.valueOf(representation[0].toUpperCase());
-        }
-        return reportType;
-    }
-
-    public int getPartition() {
-        if (partition < 0) {
-            partition = Integer.parseInt(representation[3]);
-        }
-        return partition;
-    }
-
-    public FeedRuntimeType getRuntimeType() {
-        if (runtimeType == null) {
-            runtimeType = FeedRuntimeType.valueOf(representation[2].toUpperCase());
-        }
-        return runtimeType;
-    }
-
-    public long getValue() {
-        if (value < 0) {
-            value = Long.parseLong(representation[4]);
-        }
-        return value;
-    }
-
-    public String[] getRepresentation() {
-        return representation;
-    }
-
-    @Override
-    public int compareTo(Object o) {
-        if (!(o instanceof FeedReport)) {
-            throw new IllegalArgumentException("Incorrect operand type " + o);
-        }
-
-        FeedReport other = (FeedReport) o;
-        if (!other.getReportType().equals(getReportType())) {
-            throw new IllegalArgumentException("Incorrect operand type " + o);
-        }
-
-        int returnValue = 0;
-
-        switch (getReportType()) {
-            case CONGESTION:
-                returnValue = ranking.get(getRuntimeType()) - ranking.get(other.getRuntimeType());
-                break;
-
-            case THROUGHPUT:
-                returnValue = (int) (other.getValue() - getValue());
-                break;
-        }
-
-        return returnValue;
-    }
-
-    private static Map<FeedRuntimeType, Integer> ranking = populateRanking();
-
-    private static Map<FeedRuntimeType, Integer> populateRanking() {
-        Map<FeedRuntimeType, Integer> ranking = new HashMap<FeedRuntimeType, Integer>();
-        ranking.put(FeedRuntimeType.INGESTION, 1);
-        ranking.put(FeedRuntimeType.COMPUTE, 2);
-        ranking.put(FeedRuntimeType.STORAGE, 3);
-        ranking.put(FeedRuntimeType.COMMIT, 4);
-        return ranking;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
index 88e1db5..47f373b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntime.java
@@ -14,155 +14,57 @@
  */
 package edu.uci.ics.asterix.common.feeds;
 
-import java.nio.ByteBuffer;
-
+import edu.uci.ics.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 
-public class FeedRuntime {
-
-    public enum FeedRuntimeType {
-        INGESTION,
-        COMPUTE,
-        STORAGE,
-        COMMIT
-    }
+public class FeedRuntime implements IFeedRuntime {
 
-    /** A unique identifier */
-    protected final FeedRuntimeId feedRuntimeId;
+    /** A unique identifier for the runtime **/
+    protected final FeedRuntimeId runtimeId;
 
-    /** The runtime state: @see FeedRuntimeState */
-    protected FeedRuntimeState runtimeState;
+    /** The output frame writer associated with the runtime **/
+    protected IFrameWriter frameWriter;
 
-    public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType) {
-        this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, partition);
-    }
+    /** The pre-processor associated with the runtime **/
+    protected FeedRuntimeInputHandler inputHandler;
 
-    public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType, String operandId) {
-        this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, operandId, partition);
+    public FeedRuntime(FeedRuntimeId runtimeId, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter) {
+        this.runtimeId = runtimeId;
+        this.frameWriter = frameWriter;
+        this.inputHandler = inputHandler;
     }
 
-    public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType, String operandId,
-            FeedRuntimeState runtimeState) {
-        this.feedRuntimeId = new FeedRuntimeId(feedId, feedRuntimeType, operandId, partition);
-        this.runtimeState = runtimeState;
+    public void setFrameWriter(IFeedOperatorOutputSideHandler frameWriter) {
+        this.frameWriter = frameWriter;
     }
 
     @Override
-    public String toString() {
-        return feedRuntimeId + " " + "runtime state present ? " + (runtimeState != null);
+    public FeedRuntimeId getRuntimeId() {
+        return runtimeId;
     }
 
-    public static class FeedRuntimeState {
-
-        private ByteBuffer frame;
-        private IFrameWriter frameWriter;
-        private Exception exception;
-
-        public FeedRuntimeState(ByteBuffer frame, IFrameWriter frameWriter, Exception exception) {
-            this.frame = frame;
-            this.frameWriter = frameWriter;
-            this.exception = exception;
-        }
-
-        public ByteBuffer getFrame() {
-            return frame;
-        }
-
-        public void setFrame(ByteBuffer frame) {
-            this.frame = frame;
-        }
-
-        public IFrameWriter getFrameWriter() {
-            return frameWriter;
-        }
-
-        public void setFrameWriter(IFrameWriter frameWriter) {
-            this.frameWriter = frameWriter;
-        }
-
-        public Exception getException() {
-            return exception;
-        }
-
-        public void setException(Exception exception) {
-            this.exception = exception;
-        }
-
+    @Override
+    public IFrameWriter getFeedFrameWriter() {
+        return frameWriter;
     }
 
-    public static class FeedRuntimeId {
-
-        public static final String DEFAULT_OPERATION_ID = "N/A";
-        private final FeedRuntimeType feedRuntimeType;
-        private final String operandId;
-        private final FeedConnectionId feedId;
-        private final int partition;
-        private final int hashCode;
-
-        public FeedRuntimeId(FeedConnectionId feedId, FeedRuntimeType runtimeType, String operandId, int partition) {
-            this.feedRuntimeType = runtimeType;
-            this.operandId = operandId;
-            this.feedId = feedId;
-            this.partition = partition;
-            this.hashCode = (feedId + "[" + partition + "]" + feedRuntimeType).hashCode();
-        }
-
-        public FeedRuntimeId(FeedConnectionId feedId, FeedRuntimeType runtimeType, int partition) {
-            this.feedRuntimeType = runtimeType;
-            this.operandId = DEFAULT_OPERATION_ID;
-            this.feedId = feedId;
-            this.partition = partition;
-            this.hashCode = (feedId + "[" + partition + "]" + feedRuntimeType).hashCode();
-        }
-
-        @Override
-        public String toString() {
-            return feedId + "[" + partition + "]" + " " + feedRuntimeType + "(" + operandId + ")";
-        }
-
-        @Override
-        public int hashCode() {
-            return hashCode;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (o instanceof FeedRuntimeId) {
-                FeedRuntimeId oid = ((FeedRuntimeId) o);
-                return oid.getFeedId().equals(feedId) && oid.getFeedRuntimeType().equals(feedRuntimeType)
-                        && oid.getOperandId().equals(operandId) && oid.getPartition() == partition;
-            }
-            return false;
-        }
-
-        public FeedRuntimeType getFeedRuntimeType() {
-            return feedRuntimeType;
-        }
-
-        public FeedConnectionId getFeedId() {
-            return feedId;
-        }
-
-        public String getOperandId() {
-            return operandId;
-        }
-
-        public int getPartition() {
-            return partition;
-        }
-
+    @Override
+    public String toString() {
+        return runtimeId.toString();
     }
 
-    public FeedRuntimeState getRuntimeState() {
-        return runtimeState;
+    @Override
+    public FeedRuntimeInputHandler getInputHandler() {
+        return inputHandler;
     }
 
-    public void setRuntimeState(FeedRuntimeState runtimeState) {
-        this.runtimeState = runtimeState;
+    public Mode getMode() {
+        return inputHandler != null ? inputHandler.getMode() : Mode.PROCESS;
     }
 
-    public FeedRuntimeId getFeedRuntimeId() {
-        return feedRuntimeId;
+    public void setMode(Mode mode) {
+        this.inputHandler.setMode(mode);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeId.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeId.java
new file mode 100644
index 0000000..098cc50
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeId.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.Serializable;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+public class FeedRuntimeId implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String DEFAULT_OPERAND_ID = "N/A";
+
+    private final FeedRuntimeType runtimeType;
+    private final int partition;
+    private final String operandId;
+
+    public FeedRuntimeId(FeedRuntimeType runtimeType, int partition, String operandId) {
+        this.runtimeType = runtimeType;
+        this.partition = partition;
+        this.operandId = operandId;
+    }
+
+    @Override
+    public String toString() {
+        return runtimeType + "[" + partition + "]" + "{" + operandId + "}";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof FeedRuntimeId)) {
+            return false;
+        }
+        FeedRuntimeId other = (FeedRuntimeId) o;
+        return (other.getFeedRuntimeType().equals(runtimeType) && other.getOperandId().equals(operandId) && other
+                .getPartition() == partition);
+    }
+
+    @Override
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    public FeedRuntimeType getFeedRuntimeType() {
+        return runtimeType;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public FeedRuntimeType getRuntimeType() {
+        return runtimeType;
+    }
+
+    public String getOperandId() {
+        return operandId;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeInputHandler.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeInputHandler.java
new file mode 100644
index 0000000..8db1c9d
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeInputHandler.java
@@ -0,0 +1,426 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.DataBucket.ContentType;
+import edu.uci.ics.asterix.common.feeds.api.IExceptionHandler;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryComponent;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+import edu.uci.ics.asterix.common.feeds.message.FeedCongestionMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+/**
+ * Provides for error-handling and input-side buffering for a feed runtime.
+ */
+public class FeedRuntimeInputHandler implements IFrameWriter {
+
+    private static Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
+
+    private final FeedConnectionId connectionId;
+    private final FeedRuntimeId runtimeId;
+    private final FeedPolicyAccessor feedPolicyAccessor;
+    private final boolean bufferingEnabled;
+    private final IExceptionHandler exceptionHandler;
+    private final FeedFrameDiscarder discarder;
+    private final FeedFrameSpiller spiller;
+    private final FeedPolicyAccessor fpa;
+    private final IFeedManager feedManager;
+
+    private IFrameWriter coreOperator;
+    private MonitoredBuffer mBuffer;
+    private DataBucketPool pool;
+    private FrameCollection frameCollection;
+    private Mode mode;
+    private Mode lastMode;
+    private boolean finished;
+    private long nProcessed;
+    private boolean throttlingEnabled;
+
+    private FrameEventCallback frameEventCallback;
+
+    public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId, IFrameWriter coreOperator,
+            FeedPolicyAccessor fpa, boolean bufferingEnabled, FrameTupleAccessor fta,
+            RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions) throws IOException {
+        this.connectionId = connectionId;
+        this.runtimeId = runtimeId;
+        this.coreOperator = coreOperator;
+        this.bufferingEnabled = bufferingEnabled;
+        this.feedPolicyAccessor = fpa;
+        this.spiller = new FeedFrameSpiller(ctx, connectionId, runtimeId, fpa);
+        this.discarder = new FeedFrameDiscarder(ctx, connectionId, runtimeId, fpa, this);
+        this.exceptionHandler = new FeedExceptionHandler(ctx, fta, recordDesc, feedManager, connectionId);
+        this.mode = Mode.PROCESS;
+        this.lastMode = Mode.PROCESS;
+        this.finished = false;
+        this.fpa = fpa;
+        this.feedManager = feedManager;
+        this.pool = (DataBucketPool) feedManager.getFeedMemoryManager().getMemoryComponent(
+                IFeedMemoryComponent.Type.POOL);
+        this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager().getMemoryComponent(
+                IFeedMemoryComponent.Type.COLLECTION);
+        this.frameEventCallback = new FrameEventCallback(fpa, this, coreOperator);
+        this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
+                feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
+                nPartitions, fpa);
+        this.mBuffer.start();
+        this.throttlingEnabled = false;
+    }
+
+    public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
+        try {
+            switch (mode) {
+                case PROCESS:
+                    switch (lastMode) {
+                        case SPILL:
+                        case POST_SPILL_DISCARD:
+                            setMode(Mode.PROCESS_SPILL);
+                            processSpilledBacklog();
+                            break;
+                        case STALL:
+                            setMode(Mode.PROCESS_BACKLOG);
+                            processBufferredBacklog();
+                            break;
+                        default:
+                            break;
+                    }
+                    process(frame);
+                    break;
+                case PROCESS_BACKLOG:
+                case PROCESS_SPILL:
+                    process(frame);
+                    break;
+                case SPILL:
+                    spill(frame);
+                    break;
+                case DISCARD:
+                case POST_SPILL_DISCARD:
+                    discard(frame);
+                    break;
+                case STALL:
+                    switch (runtimeId.getFeedRuntimeType()) {
+                        case COLLECT:
+                        case COMPUTE_COLLECT:
+                        case COMPUTE:
+                        case STORE:
+                            bufferDataUntilRecovery(frame);
+                            break;
+                        default:
+                            if (LOGGER.isLoggable(Level.WARNING)) {
+                                LOGGER.warning("Discarding frame during " + mode + " mode " + this.runtimeId);
+                            }
+                            break;
+                    }
+                    break;
+                case END:
+                case FAIL:
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Ignoring incoming tuples in " + mode + " mode");
+                    }
+                    break;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void bufferDataUntilRecovery(ByteBuffer frame) throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Bufferring data until recovery is complete " + this.runtimeId);
+        }
+        if (frameCollection == null) {
+            this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager().getMemoryComponent(
+                    IFeedMemoryComponent.Type.COLLECTION);
+        }
+        if (frameCollection == null) {
+            discarder.processMessage(frame);
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Running low on memory! DISCARDING FRAME ");
+            }
+        } else {
+            boolean success = frameCollection.addFrame(frame);
+            if (!success) {
+                if (fpa.spillToDiskOnCongestion()) {
+                    if (frame != null) {
+                        spiller.processMessage(frame);
+                    } // TODO handle the else case
+                } else {
+                    discarder.processMessage(frame);
+                }
+            }
+        }
+    }
+
+    public void reportUnresolvableCongestion() throws HyracksDataException {
+        if (this.runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.COMPUTE)) {
+            FeedCongestionMessage congestionReport = new FeedCongestionMessage(connectionId, runtimeId,
+                    mBuffer.getInflowRate(), mBuffer.getOutflowRate(), mode);
+            feedManager.getFeedMessageService().sendMessage(congestionReport);
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Congestion reported " + this.connectionId + " " + this.runtimeId);
+            }
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unresolvable congestion at " + this.connectionId + " " + this.runtimeId);
+            }
+        }
+    }
+
+    private void processBufferredBacklog() throws HyracksDataException {
+        try {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Processing backlog " + this.runtimeId);
+            }
+
+            if (frameCollection != null) {
+                Iterator<ByteBuffer> backlog = frameCollection.getFrameCollectionIterator();
+                while (backlog.hasNext()) {
+                    process(backlog.next());
+                    nProcessed++;
+                }
+                DataBucket bucket = pool.getDataBucket();
+                bucket.setContentType(ContentType.EOSD);
+                bucket.setDesiredReadCount(1);
+                mBuffer.sendMessage(bucket);
+                feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
+                frameCollection = null;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void processSpilledBacklog() throws HyracksDataException {
+        try {
+            Iterator<ByteBuffer> backlog = spiller.replayData();
+            while (backlog.hasNext()) {
+                process(backlog.next());
+                nProcessed++;
+            }
+            DataBucket bucket = pool.getDataBucket();
+            bucket.setContentType(ContentType.EOSD);
+            bucket.setDesiredReadCount(1);
+            mBuffer.sendMessage(bucket);
+            spiller.reset();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    protected void process(ByteBuffer frame) throws HyracksDataException {
+        boolean frameProcessed = false;
+        while (!frameProcessed) {
+            try {
+                if (!bufferingEnabled) {
+                    coreOperator.nextFrame(frame); // synchronous
+                    mBuffer.sendReport(frame);
+                } else {
+                    DataBucket bucket = pool.getDataBucket();
+                    if (bucket != null) {
+                        if (frame != null) {
+                            bucket.reset(frame); // created a copy here
+                            bucket.setContentType(ContentType.DATA);
+                        } else {
+                            bucket.setContentType(ContentType.EOD);
+                        }
+                        bucket.setDesiredReadCount(1);
+                        mBuffer.sendMessage(bucket);
+                        mBuffer.sendReport(frame);
+                        nProcessed++;
+                    } else {
+                        if (fpa.spillToDiskOnCongestion()) {
+                            if (frame != null) {
+                                boolean spilled = spiller.processMessage(frame);
+                                if (spilled) {
+                                    setMode(Mode.SPILL);
+                                } else {
+                                    reportUnresolvableCongestion();
+                                }
+                            }
+                        } else if (fpa.discardOnCongestion()) {
+                            boolean discarded = discarder.processMessage(frame);
+                            if (!discarded) {
+                                reportUnresolvableCongestion();
+                            }
+                        } else if (fpa.throttlingEnabled()) {
+                            setThrottlingEnabled(true);
+                        } else {
+                            reportUnresolvableCongestion();
+                        }
+
+                    }
+                }
+                frameProcessed = true;
+            } catch (Exception e) {
+                if (feedPolicyAccessor.continueOnSoftFailure()) {
+                    frame = exceptionHandler.handleException(e, frame);
+                    if (frame == null) {
+                        frameProcessed = true;
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Encountered exception! " + e.getMessage()
+                                    + "Insufficient information, Cannot extract failing tuple");
+                        }
+                    }
+                } else {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Ingestion policy does not require recovering from tuple. Feed would terminate");
+                    }
+                    mBuffer.close(false);
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+    }
+
+    private void spill(ByteBuffer frame) throws Exception {
+        boolean success = spiller.processMessage(frame);
+        if (!success) {
+            // limit reached
+            setMode(Mode.POST_SPILL_DISCARD);
+            reportUnresolvableCongestion();
+        }
+    }
+
+    private void discard(ByteBuffer frame) throws Exception {
+        boolean success = discarder.processMessage(frame);
+        if (!success) { // limit reached
+            reportUnresolvableCongestion();
+        }
+    }
+
+    public Mode getMode() {
+        return mode;
+    }
+
+    public synchronized void setMode(Mode mode) {
+        if (mode.equals(this.mode)) {
+            return;
+        }
+        this.lastMode = this.mode;
+        this.mode = mode;
+        if (mode.equals(Mode.END)) {
+            this.close();
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Switched from " + lastMode + " to " + mode + " " + this.runtimeId);
+        }
+    }
+
+    public void close() {
+        if (mBuffer != null) {
+            boolean disableMonitoring = !this.mode.equals(Mode.STALL);
+            if (frameCollection != null) {
+                feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
+            }
+            if (pool != null) {
+                feedManager.getFeedMemoryManager().releaseMemoryComponent(pool);
+            }
+            mBuffer.close(false, disableMonitoring);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Closed input side handler for " + this.runtimeId + " disabled monitoring "
+                        + disableMonitoring + " Mode for runtime " + this.mode);
+            }
+        }
+    }
+
+    public IFrameWriter getCoreOperator() {
+        return coreOperator;
+    }
+
+    public void setCoreOperator(IFrameWriter coreOperator) {
+        this.coreOperator = coreOperator;
+        mBuffer.setFrameWriter(coreOperator);
+        frameEventCallback.setCoreOperator(coreOperator);
+    }
+
+    public boolean isFinished() {
+        return finished;
+    }
+
+    public void setFinished(boolean finished) {
+        this.finished = finished;
+    }
+
+    public long getProcessed() {
+        return nProcessed;
+    }
+
+    public FeedRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        coreOperator.open();
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        coreOperator.fail();
+    }
+
+    public void reset(int nPartitions) {
+        this.mBuffer.setNumberOfPartitions(nPartitions);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Reset number of partitions to " + nPartitions + " for " + this.runtimeId);
+        }
+        if (mBuffer != null) {
+            mBuffer.reset();
+        }
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public IFeedManager getFeedManager() {
+        return feedManager;
+    }
+
+    public MonitoredBuffer getmBuffer() {
+        return mBuffer;
+    }
+
+    public boolean isThrottlingEnabled() {
+        return throttlingEnabled;
+    }
+
+    public void setThrottlingEnabled(boolean throttlingEnabled) {
+        if (this.throttlingEnabled != throttlingEnabled) {
+            this.throttlingEnabled = throttlingEnabled;
+            IFeedMessage throttlingEnabledMesg = new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
+            feedManager.getFeedMessageService().sendMessage(throttlingEnabledMesg);
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Throttling " + throttlingEnabled + " for " + this.connectionId + "[" + runtimeId + "]");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeManager.java
index a68f6b8..d0438f8 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeManager.java
@@ -15,90 +15,42 @@
 package edu.uci.ics.asterix.common.feeds;
 
 import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedConnectionManager;
 
 public class FeedRuntimeManager {
 
     private static Logger LOGGER = Logger.getLogger(FeedRuntimeManager.class.getName());
 
-    private final FeedConnectionId feedId;
-    private final IFeedManager feedManager;
-    private SuperFeedManager superFeedManager;
+    private final FeedConnectionId connectionId;
+    private final IFeedConnectionManager connectionManager;
     private final Map<FeedRuntimeId, FeedRuntime> feedRuntimes;
+
     private final ExecutorService executorService;
-    private FeedMessageService messageService;
-    private SocketFactory socketFactory = new SocketFactory();
-    private final LinkedBlockingQueue<String> feedReportQueue;
 
-    public FeedRuntimeManager(FeedConnectionId feedId, IFeedManager feedManager) {
-        this.feedId = feedId;
-        feedRuntimes = new ConcurrentHashMap<FeedRuntimeId, FeedRuntime>();
-        executorService = Executors.newCachedThreadPool();
-        feedReportQueue = new LinkedBlockingQueue<String>();
-        this.feedManager = feedManager;
+    public FeedRuntimeManager(FeedConnectionId connectionId, IFeedConnectionManager feedConnectionManager) {
+        this.connectionId = connectionId;
+        this.feedRuntimes = new ConcurrentHashMap<FeedRuntimeId, FeedRuntime>();
+        this.executorService = Executors.newCachedThreadPool();
+        this.connectionManager = feedConnectionManager;
     }
 
-    public void close(boolean closeAll) throws IOException {
-        socketFactory.close();
-
-        if (messageService != null) {
-            messageService.stop();
+    public void close() throws IOException {
+        if (executorService != null) {
+            executorService.shutdownNow();
             if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Shut down message services for :" + feedId);
-            }
-            messageService = null;
-        }
-        if (superFeedManager != null && superFeedManager.isLocal()) {
-            superFeedManager.stop();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Shut down super feed manager for :" + feedId);
-            }
-        }
-
-        if (closeAll) {
-            if (executorService != null) {
-                executorService.shutdownNow();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Shut down executor service for :" + feedId);
-                }
+                LOGGER.info("Shut down executor service for :" + connectionId);
             }
         }
     }
 
-    public void setSuperFeedManager(SuperFeedManager sfm) throws UnknownHostException, IOException, Exception {
-        this.superFeedManager = sfm;
-        if (sfm.isLocal()) {
-            sfm.start();
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Started Super Feed Manager for feed :" + feedId);
-        }
-        this.messageService = new FeedMessageService(feedId, feedManager);
-        messageService.start();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Started Feed Message Service for feed :" + feedId);
-        }
-    }
-
-    public SuperFeedManager getSuperFeedManager() {
-        return superFeedManager;
-    }
-
     public FeedRuntime getFeedRuntime(FeedRuntimeId runtimeId) {
         return feedRuntimes.get(runtimeId);
     }
@@ -107,17 +59,10 @@ public class FeedRuntimeManager {
         feedRuntimes.put(runtimeId, feedRuntime);
     }
 
-    public void deregisterFeedRuntime(FeedRuntimeId runtimeId) {
+    public synchronized void deregisterFeedRuntime(FeedRuntimeId runtimeId) {
         feedRuntimes.remove(runtimeId);
         if (feedRuntimes.isEmpty()) {
-            synchronized (this) {
-                if (feedRuntimes.isEmpty()) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("De-registering feed");
-                    }
-                    feedManager.deregisterFeed(runtimeId.getFeedId());
-                }
-            }
+            connectionManager.deregisterFeed(connectionId);
         }
     }
 
@@ -125,114 +70,8 @@ public class FeedRuntimeManager {
         return executorService;
     }
 
-    public FeedMessageService getMessageService() {
-        return messageService;
-    }
-
-    public Socket createClientSocket(String host, int port) throws UnknownHostException, IOException {
-        return socketFactory.createClientSocket(host, port);
-    }
-
-    public Socket createClientSocket(String host, int port, long timeout) throws UnknownHostException, IOException {
-        Socket client = null;
-        boolean continueAttempt = true;
-        long startAttempt = System.currentTimeMillis();
-        long endAttempt = System.currentTimeMillis();
-        while (client == null && continueAttempt) {
-            try {
-                client = socketFactory.createClientSocket(host, port);
-            } catch (Exception e) {
-                endAttempt = System.currentTimeMillis();
-                if (endAttempt - startAttempt > timeout) {
-                    continueAttempt = false;
-                }
-            }
-        }
-        return client;
-    }
-
-    public ServerSocket createServerSocket(int port) throws IOException {
-        return socketFactory.createServerSocket(port);
-    }
-
-    private static class SocketFactory {
-
-        private final Map<SocketId, Socket> sockets = new HashMap<SocketId, Socket>();
-        private final List<ServerSocket> serverSockets = new ArrayList<ServerSocket>();
-
-        public Socket createClientSocket(String host, int port) throws UnknownHostException, IOException {
-            Socket socket = new Socket(host, port);
-            sockets.put(new SocketId(host, port), socket);
-            return socket;
-        }
-
-        public void close() throws IOException {
-            for (ServerSocket socket : serverSockets) {
-                socket.close();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Closed server socket :" + socket);
-                }
-            }
-
-            for (Entry<SocketId, Socket> entry : sockets.entrySet()) {
-                entry.getValue().close();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Closed client socket :" + entry.getKey());
-                }
-            }
-        }
-
-        public ServerSocket createServerSocket(int port) throws IOException {
-            ServerSocket socket = new ServerSocket(port);
-            serverSockets.add(socket);
-            return socket;
-        }
-
-        private static class SocketId {
-            private final String host;
-            private final int port;
-
-            public SocketId(String host, int port) {
-                this.host = host;
-                this.port = port;
-            }
-
-            public String getHost() {
-                return host;
-            }
-
-            public int getPort() {
-                return port;
-            }
-
-            @Override
-            public String toString() {
-                return host + "[" + port + "]";
-            }
-
-            @Override
-            public int hashCode() {
-                return toString().hashCode();
-            }
-
-            @Override
-            public boolean equals(Object o) {
-                if (!(o instanceof SocketId)) {
-                    return false;
-                }
-
-                return ((SocketId) o).getHost().equals(host) && ((SocketId) o).getPort() == port;
-            }
-
-        }
-    }
-
-    public FeedConnectionId getFeedId() {
-        return feedId;
-    }
-
-    public LinkedBlockingQueue<String> getFeedReportQueue() {
-        return feedReportQueue;
+    public Set<FeedRuntimeId> getFeedRuntimes() {
+        return feedRuntimes.keySet();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeReport.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeReport.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeReport.java
new file mode 100644
index 0000000..868bcc7
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedRuntimeReport.java
@@ -0,0 +1,5 @@
+package edu.uci.ics.asterix.common.feeds;
+
+public class FeedRuntimeReport {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedStorageStatistics.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedStorageStatistics.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedStorageStatistics.java
new file mode 100644
index 0000000..1edf210
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/FeedStorageStatistics.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.asterix.common.feeds;
+
+public class FeedStorageStatistics {
+
+    private long avgPersistenceDelay;
+    private long lastIntakeTimestamp;
+    
+    
+}