You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:09 UTC

[12/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetricCollector.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetricCollector.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetricCollector.java
new file mode 100644
index 0000000..1680387
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetricCollector.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+
+public interface IFeedMetricCollector {
+
+    public enum ValueType {
+        CPU_USAGE,
+        INFLOW_RATE,
+        OUTFLOW_RATE
+    }
+
+    public enum MetricType {
+        AVG,
+        RATE
+    }
+
+    public boolean sendReport(int senderId, int value);
+
+    public int getMetric(int senderId);
+
+    public int getMetric(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType);
+
+    int createReportSender(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType,
+            MetricType metricType);
+
+    public void removeReportSender(int senderId);
+
+    public void resetReportSender(int senderId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedOperatorOutputSideHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedOperatorOutputSideHandler.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedOperatorOutputSideHandler.java
new file mode 100644
index 0000000..d8c0b94
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedOperatorOutputSideHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+
+public interface IFeedOperatorOutputSideHandler extends IFrameWriter {
+
+    public enum Type {
+        BASIC_FEED_OUTPUT_HANDLER,
+        DISTRIBUTE_FEED_OUTPUT_HANDLER,
+        COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER
+    }
+
+    public FeedId getFeedId();
+
+    public Type getType();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedProvider.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedProvider.java
new file mode 100644
index 0000000..89dc33b
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedProvider.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedId;
+
+public interface IFeedProvider {
+
+    public void subscribeFeed(FeedId sourceDeedId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedRuntime.java
new file mode 100644
index 0000000..73336b7
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedRuntime.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+
+public interface IFeedRuntime {
+
+    public enum FeedRuntimeType {
+        INTAKE,
+        COLLECT,
+        COMPUTE_COLLECT,
+        COMPUTE,
+        STORE,
+        OTHER,
+        ETS,
+        JOIN
+    }
+
+    public enum Mode {
+        PROCESS,
+        SPILL,
+        PROCESS_SPILL,
+        DISCARD,
+        POST_SPILL_DISCARD,
+        PROCESS_BACKLOG,
+        STALL,
+        FAIL,
+        END
+    }
+
+    /**
+     * @return the unique runtime id associated with the feedRuntime
+     */
+    public FeedRuntimeId getRuntimeId();
+
+    /**
+     * @return the frame writer associated with the feed runtime.
+     */
+    public IFrameWriter getFeedFrameWriter();
+
+    public FeedRuntimeInputHandler getInputHandler();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedService.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedService.java
new file mode 100644
index 0000000..6e85eca
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedService.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+public interface IFeedService {
+
+    public void start() throws Exception;
+
+    public void stop();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedSubscriptionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedSubscriptionManager.java
new file mode 100644
index 0000000..ea476f8
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedSubscriptionManager.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+
+public interface IFeedSubscriptionManager {
+
+    /**
+     * @param subscribableRuntime
+     */
+    public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime);
+
+    /**
+     * @param subscribableRuntimeId
+     */
+    public void deregisterFeedSubscribableRuntime(SubscribableFeedRuntimeId subscribableRuntimeId);
+
+    /**
+     * @param subscribableRuntimeId
+     * @return
+     */
+    public ISubscribableRuntime getSubscribableRuntime(SubscribableFeedRuntimeId subscribableRuntimeId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedTrackingManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedTrackingManager.java
new file mode 100644
index 0000000..958bf23
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedTrackingManager.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitAckMessage;
+
+public interface IFeedTrackingManager {
+
+    public void submitAckReport(FeedTupleCommitAckMessage ackMessage);
+
+    public void disableAcking(FeedConnectionId connectionId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWork.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWork.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWork.java
new file mode 100644
index 0000000..8e340b5
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWork.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+/**
+ * Represents a feed management task. The task is executed asynchronously.
+ */
+public interface IFeedWork {
+
+    public Runnable getRunnable();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkEventListener.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkEventListener.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkEventListener.java
new file mode 100644
index 0000000..4b989da
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkEventListener.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+/**
+ * Provides a callback mechanism that in invoked for events related to
+ * the execution of a feed management task.
+ */
+public interface IFeedWorkEventListener {
+
+    /**
+     * A call back that is invoked after successful completion of a feed
+     * management task.
+     */
+    public void workCompleted(IFeedWork work);
+
+    /**
+     * A call back that is invokved after a failed execution of a feed
+     * management task.
+     * 
+     * @param e
+     *            exception encountered during execution of the task.
+     */
+    public void workFailed(IFeedWork work, Exception e);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkManager.java
new file mode 100644
index 0000000..58ea396
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkManager.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+public interface IFeedWorkManager {
+
+    public void submitWork(IFeedWork work, IFeedWorkEventListener listener);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFrameEventCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFrameEventCallback.java
new file mode 100644
index 0000000..0a1c1fe
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFrameEventCallback.java
@@ -0,0 +1,14 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+public interface IFrameEventCallback {
+
+    public enum FrameEvent {
+        FINISHED_PROCESSING,
+        PENDING_WORK_THRESHOLD_REACHED,
+        PENDING_WORK_DONE,
+        NO_OP,
+        FINISHED_PROCESSING_SPILLAGE
+    }
+
+    public void frameEvent(FrameEvent frameEvent);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IIntakeProgressTracker.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IIntakeProgressTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IIntakeProgressTracker.java
new file mode 100644
index 0000000..88f80e4
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IIntakeProgressTracker.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.util.Map;
+
+public interface IIntakeProgressTracker {
+
+    public void configure(Map<String, String> configuration);
+
+    public void notifyIngestedTupleTimestamp(long timestamp);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IMessageReceiver.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IMessageReceiver.java
new file mode 100644
index 0000000..4d986ab
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IMessageReceiver.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+public interface IMessageReceiver<T> {
+
+    public void sendMessage(T message);
+
+    public void close(boolean processPending);
+
+    public void start();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscribableRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscribableRuntime.java
new file mode 100644
index 0000000..3b44cbb
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscribableRuntime.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.feeds.CollectionRuntime;
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+/**
+ * Represent a feed runtime whose output can be routed along other parallel path(s).
+ */
+public interface ISubscribableRuntime extends IFeedRuntime {
+
+    /**
+     * @param collectionRuntime
+     * @throws Exception
+     */
+    public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception;
+
+    /**
+     * @param collectionRuntime
+     * @throws Exception
+     */
+    public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception;
+
+    /**
+     * @return
+     * @throws Exception
+     */
+    public List<ISubscriberRuntime> getSubscribers();
+
+    /**
+     * @return
+     */
+    public DistributeFeedFrameWriter getFeedFrameWriter();
+
+    /**
+     * @return
+     */
+    public RecordDescriptor getRecordDescriptor();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriberRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriberRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriberRuntime.java
new file mode 100644
index 0000000..abf913c
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriberRuntime.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.FeedFrameCollector;
+
+public interface ISubscriberRuntime {
+
+    public Map<String, String> getFeedPolicy();
+
+    public FeedFrameCollector getFrameCollector();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriptionProvider.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriptionProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriptionProvider.java
new file mode 100644
index 0000000..76593ea
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriptionProvider.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedId;
+
+public interface ISubscriptionProvider {
+
+    public void subscribeFeed(FeedId sourceFeedId, FeedId recipientFeedId);
+
+    public void unsubscribeFeed(FeedId sourceFeedId, FeedId recipientFeedId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
new file mode 100644
index 0000000..485b9f0
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
@@ -0,0 +1,6 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+public interface ITupleTrackingFeedAdapter extends IFeedAdapter {
+
+    public void tuplePersistedTimeCallback(long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/EndFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/EndFeedMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/EndFeedMessage.java
new file mode 100644
index 0000000..4d08ea3
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/EndFeedMessage.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+/**
+ * A feed control message indicating the need to end the feed. This message is dispatched
+ * to all locations that host an operator involved in the feed pipeline.
+ */
+public class EndFeedMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedId sourceFeedId;
+
+    private final FeedConnectionId connectionId;
+
+    private final FeedRuntimeType sourceRuntimeType;
+
+    private final boolean completeDisconnection;
+
+    private final EndMessageType endMessageType;
+
+    public enum EndMessageType {
+        DISCONNECT_FEED,
+        DISCONTINUE_SOURCE
+    }
+
+    public EndFeedMessage(FeedConnectionId connectionId, FeedRuntimeType sourceRuntimeType, FeedId sourceFeedId,
+            boolean completeDisconnection, EndMessageType endMessageType) {
+        super(MessageType.END);
+        this.connectionId = connectionId;
+        this.sourceRuntimeType = sourceRuntimeType;
+        this.sourceFeedId = sourceFeedId;
+        this.completeDisconnection = completeDisconnection;
+        this.endMessageType = endMessageType;
+    }
+
+    @Override
+    public String toString() {
+        return MessageType.END.name() + "  " + connectionId + " [" + sourceRuntimeType + "] ";
+    }
+
+    public FeedRuntimeType getSourceRuntimeType() {
+        return sourceRuntimeType;
+    }
+
+    public FeedId getSourceFeedId() {
+        return sourceFeedId;
+    }
+
+    public boolean isCompleteDisconnection() {
+        return completeDisconnection;
+    }
+
+    public EndMessageType getEndMessageType() {
+        return endMessageType;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        return obj;
+    }
+
+    public FeedConnectionId getFeedConnectionId() {
+        return connectionId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedCongestionMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedCongestionMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedCongestionMessage.java
new file mode 100644
index 0000000..16fb115
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedCongestionMessage.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedConstants.MessageConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+
+public class FeedCongestionMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+    private final FeedRuntimeId runtimeId;
+    private int inflowRate;
+    private int outflowRate;
+    private Mode mode;
+
+    public FeedCongestionMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId, int inflowRate,
+            int outflowRate, Mode mode) {
+        super(MessageType.CONGESTION);
+        this.connectionId = connectionId;
+        this.runtimeId = runtimeId;
+        this.inflowRate = inflowRate;
+        this.outflowRate = outflowRate;
+        this.mode = mode;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
+        obj.put(FeedConstants.MessageConstants.OPERAND_ID, runtimeId.getOperandId());
+        obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
+        obj.put(FeedConstants.MessageConstants.INFLOW_RATE, inflowRate);
+        obj.put(FeedConstants.MessageConstants.OUTFLOW_RATE, outflowRate);
+        obj.put(FeedConstants.MessageConstants.MODE, mode);
+        return obj;
+    }
+
+    public FeedRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
+    public int getInflowRate() {
+        return inflowRate;
+    }
+
+    public int getOutflowRate() {
+        return outflowRate;
+    }
+
+    public static FeedCongestionMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
+                .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
+                obj.getInt(FeedConstants.MessageConstants.PARTITION),
+                obj.getString(FeedConstants.MessageConstants.OPERAND_ID));
+        Mode mode = Mode.valueOf(obj.getString(MessageConstants.MODE));
+        return new FeedCongestionMessage(connectionId, runtimeId,
+                obj.getInt(FeedConstants.MessageConstants.INFLOW_RATE),
+                obj.getInt(FeedConstants.MessageConstants.OUTFLOW_RATE), mode);
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public Mode getMode() {
+        return mode;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessage.java
new file mode 100644
index 0000000..62e774c
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.hyracks.api.dataflow.value.JSONSerializable;
+
+/**
+ * A control message that can be sent to the runtime instance of a
+ * feed's adapter.
+ */
+public abstract class FeedMessage implements IFeedMessage, JSONSerializable {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final MessageType messageType;
+
+    public FeedMessage(MessageType messageType) {
+        this.messageType = messageType;
+    }
+
+    public MessageType getMessageType() {
+        return messageType;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessageService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessageService.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessageService.java
new file mode 100644
index 0000000..51a950b
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessageService.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessageService;
+
+/**
+ * Sends feed report messages on behalf of an operator instance
+ * to the SuperFeedManager associated with the feed.
+ */
+public class FeedMessageService implements IFeedMessageService {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
+
+    private final LinkedBlockingQueue<String> inbox;
+    private final FeedMessageHandler mesgHandler;
+    private final String nodeId;
+    private ExecutorService executor;
+
+    public FeedMessageService(AsterixFeedProperties feedProperties, String nodeId, String ccClusterIp) {
+        this.inbox = new LinkedBlockingQueue<String>();
+        this.mesgHandler = new FeedMessageHandler(inbox, ccClusterIp, feedProperties.getFeedCentralManagerPort());
+        this.nodeId = nodeId;
+        this.executor = Executors.newSingleThreadExecutor();
+    }
+
+    public void start() throws Exception {
+        executor.execute(mesgHandler);
+    }
+
+    public void stop() {
+        synchronized (mesgHandler.getLock()) {
+            executor.shutdownNow();
+        }
+        mesgHandler.stop();
+    }
+
+    @Override
+    public void sendMessage(IFeedMessage message) {
+        try {
+            JSONObject obj = message.toJSON();
+            obj.put(FeedConstants.MessageConstants.NODE_ID, nodeId);
+            obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, message.getMessageType().name());
+            inbox.add(obj.toString());
+        } catch (JSONException jse) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("JSON Exception in parsing message " + message);
+            }
+        }
+    }
+
+    private static class FeedMessageHandler implements Runnable {
+
+        private final LinkedBlockingQueue<String> inbox;
+        private final String host;
+        private final int port;
+
+        private Socket cfmSocket;
+        private Object lock;
+
+        private static final byte[] EOL = "\n".getBytes();
+
+        public FeedMessageHandler(LinkedBlockingQueue<String> inbox, String host, int port) {
+            this.inbox = inbox;
+            this.host = host;
+            this.port = port;
+            this.lock = new Object();
+        }
+
+        public void run() {
+            try {
+                cfmSocket = new Socket(host, port);
+                if (cfmSocket != null) {
+                    while (true) {
+                        String message = inbox.take();
+                        synchronized (lock) {
+                            cfmSocket.getOutputStream().write(message.getBytes());
+                            cfmSocket.getOutputStream().write(EOL);
+                        }
+                    }
+                } else {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Unable to start feed message service");
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
+                }
+            } finally {
+                stop();
+            }
+
+        }
+
+        public void stop() {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Stopping feed message handler");
+            }
+            if (cfmSocket != null) {
+                try {
+                    cfmSocket.close();
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Exception in closing socket " + e.getMessage());
+                    }
+                }
+            }
+        }
+
+        public Object getLock() {
+            return lock;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedReportMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedReportMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedReportMessage.java
new file mode 100644
index 0000000..128803a
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedReportMessage.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedConstants.MessageConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector.ValueType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+public class FeedReportMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+    private final FeedRuntimeId runtimeId;
+    private final ValueType valueType;
+    private int value;
+
+    public FeedReportMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType, int value) {
+        super(MessageType.FEED_REPORT);
+        this.connectionId = connectionId;
+        this.runtimeId = runtimeId;
+        this.valueType = valueType;
+        this.value = value;
+    }
+
+    public void reset(int value) {
+        this.value = value;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
+        obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
+        obj.put(FeedConstants.MessageConstants.VALUE_TYPE, valueType);
+        obj.put(FeedConstants.MessageConstants.VALUE, value);
+        return obj;
+    }
+
+    public static FeedReportMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
+                .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
+                obj.getInt(FeedConstants.MessageConstants.PARTITION), FeedConstants.MessageConstants.NOT_APPLICABLE);
+        ValueType type = ValueType.valueOf(obj.getString(MessageConstants.VALUE_TYPE));
+        int value = Integer.parseInt(obj.getString(MessageConstants.VALUE));
+        return new FeedReportMessage(connectionId, runtimeId, type, value);
+    }
+
+    public int getValue() {
+        return value;
+    }
+
+    public void setValue(int value) {
+        this.value = value;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public FeedRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
+    public ValueType getValueType() {
+        return valueType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitAckMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
new file mode 100644
index 0000000..cacfeb2
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+
+public class FeedTupleCommitAckMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+    private int intakePartition;
+    private int base;
+    private byte[] commitAcks;
+
+    public FeedTupleCommitAckMessage(FeedConnectionId connectionId, int intakePartition, int base, byte[] commitAcks) {
+        super(MessageType.COMMIT_ACK);
+        this.connectionId = connectionId;
+        this.intakePartition = intakePartition;
+        this.base = base;
+        this.commitAcks = commitAcks;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+        obj.put(FeedConstants.MessageConstants.BASE, base);
+        String commitAcksString = DatatypeConverter.printBase64Binary(commitAcks);
+        obj.put(FeedConstants.MessageConstants.COMMIT_ACKS, commitAcksString);
+        return obj;
+    }
+
+    public static FeedTupleCommitAckMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
+        int base = obj.getInt(FeedConstants.MessageConstants.BASE);
+        String commitAcksString = obj.getString(FeedConstants.MessageConstants.COMMIT_ACKS);
+        byte[] commitAcks = DatatypeConverter.parseBase64Binary(commitAcksString);
+        return new FeedTupleCommitAckMessage(connectionId, intakePartition, base, commitAcks);
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public int getIntakePartition() {
+        return intakePartition;
+    }
+
+    public byte[] getCommitAcks() {
+        return commitAcks;
+    }
+
+    public void reset(int intakePartition, int base, byte[] commitAcks) {
+        this.intakePartition = intakePartition;
+        this.base = base;
+        this.commitAcks = commitAcks;
+    }
+
+    public int getBase() {
+        return base;
+    }
+
+    public void setBase(int base) {
+        this.base = base;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
new file mode 100644
index 0000000..663a629
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+
+public class FeedTupleCommitResponseMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+    private final int intakePartition;
+    private final int maxWindowAcked;
+
+    public FeedTupleCommitResponseMessage(FeedConnectionId connectionId, int intakePartition, int maxWindowAcked) {
+        super(MessageType.COMMIT_ACK_RESPONSE);
+        this.connectionId = connectionId;
+        this.intakePartition = intakePartition;
+        this.maxWindowAcked = maxWindowAcked;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+        obj.put(FeedConstants.MessageConstants.MAX_WINDOW_ACKED, maxWindowAcked);
+        return obj;
+    }
+
+    public static FeedTupleCommitResponseMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
+        int maxWindowAcked = obj.getInt(FeedConstants.MessageConstants.MAX_WINDOW_ACKED);
+        return new FeedTupleCommitResponseMessage(connectionId, intakePartition, maxWindowAcked);
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public int getMaxWindowAcked() {
+        return maxWindowAcked;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/NodeReportMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/NodeReportMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/NodeReportMessage.java
new file mode 100644
index 0000000..32c35f3
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/NodeReportMessage.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+
+public class NodeReportMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private double cpuLoad;
+    private double usedHeap;
+    private int nRuntimes;
+
+    public NodeReportMessage(float cpuLoad, long usedHeap, int nRuntimes) {
+        super(IFeedMessage.MessageType.NODE_REPORT);
+        this.usedHeap = usedHeap;
+        this.cpuLoad = cpuLoad;
+        this.nRuntimes = nRuntimes;
+    }
+
+    public void reset(double cpuLoad, double usedHeap, int nRuntimes) {
+        this.cpuLoad = cpuLoad;
+        this.usedHeap = usedHeap;
+        this.nRuntimes = nRuntimes;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.CPU_LOAD, cpuLoad);
+        obj.put(FeedConstants.MessageConstants.HEAP_USAGE, usedHeap);
+        obj.put(FeedConstants.MessageConstants.N_RUNTIMES, nRuntimes);
+        return obj;
+    }
+
+    public double getCpuLoad() {
+        return cpuLoad;
+    }
+
+    public double getUsedHeap() {
+        return usedHeap;
+    }
+
+    public int getnRuntimes() {
+        return nRuntimes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ScaleInReportMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ScaleInReportMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ScaleInReportMessage.java
new file mode 100644
index 0000000..7acc20f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ScaleInReportMessage.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+/**
+ * A feed control message indicating the need to scale in a stage of the feed ingestion pipeline.
+ * Currently, scaling-in of the compute stage is supported.
+ **/
+public class ScaleInReportMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+
+    private final FeedRuntimeType runtimeType;
+
+    private int currentCardinality;
+
+    private int reducedCardinaliy;
+
+    public ScaleInReportMessage(FeedConnectionId connectionId, FeedRuntimeType runtimeType, int currentCardinality,
+            int reducedCardinaliy) {
+        super(MessageType.SCALE_IN_REQUEST);
+        this.connectionId = connectionId;
+        this.runtimeType = runtimeType;
+        this.currentCardinality = currentCardinality;
+        this.reducedCardinaliy = reducedCardinaliy;
+    }
+
+    @Override
+    public String toString() {
+        return MessageType.SCALE_IN_REQUEST.name() + "  " + connectionId + " [" + runtimeType + "] "
+                + " currentCardinality " + currentCardinality + " reducedCardinality " + reducedCardinaliy;
+    }
+
+    public FeedRuntimeType getRuntimeType() {
+        return runtimeType;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeType);
+        obj.put(FeedConstants.MessageConstants.CURRENT_CARDINALITY, currentCardinality);
+        obj.put(FeedConstants.MessageConstants.REDUCED_CARDINALITY, reducedCardinaliy);
+        return obj;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public static ScaleInReportMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        FeedRuntimeType runtimeType = FeedRuntimeType.valueOf(obj
+                .getString(FeedConstants.MessageConstants.RUNTIME_TYPE));
+        return new ScaleInReportMessage(connectionId, runtimeType,
+                obj.getInt(FeedConstants.MessageConstants.CURRENT_CARDINALITY),
+                obj.getInt(FeedConstants.MessageConstants.REDUCED_CARDINALITY));
+    }
+
+    public void reset(int currentCardinality, int reducedCardinaliy) {
+        this.currentCardinality = currentCardinality;
+        this.reducedCardinaliy = reducedCardinaliy;
+    }
+
+    public int getCurrentCardinality() {
+        return currentCardinality;
+    }
+
+    public void setCurrentCardinality(int currentCardinality) {
+        this.currentCardinality = currentCardinality;
+    }
+
+    public int getReducedCardinaliy() {
+        return reducedCardinaliy;
+    }
+
+    public void setReducedCardinaliy(int reducedCardinaliy) {
+        this.reducedCardinaliy = reducedCardinaliy;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/StorageReportFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/StorageReportFeedMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/StorageReportFeedMessage.java
new file mode 100644
index 0000000..502eb98
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/StorageReportFeedMessage.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedConstants.MessageConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+
+/**
+ * A feed control message sent from a storage runtime of a feed pipeline to report the intake timestamp corresponding
+ * to the last persisted tuple.
+ */
+public class StorageReportFeedMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+    private final int partition;
+    private long lastPersistedTupleIntakeTimestamp;
+    private boolean persistenceDelayWithinLimit;
+    private long averageDelay;
+    private int intakePartition;
+
+    public StorageReportFeedMessage(FeedConnectionId connectionId, int partition,
+            long lastPersistedTupleIntakeTimestamp, boolean persistenceDelayWithinLimit, long averageDelay,
+            int intakePartition) {
+        super(MessageType.STORAGE_REPORT);
+        this.connectionId = connectionId;
+        this.partition = partition;
+        this.lastPersistedTupleIntakeTimestamp = lastPersistedTupleIntakeTimestamp;
+        this.persistenceDelayWithinLimit = persistenceDelayWithinLimit;
+        this.averageDelay = averageDelay;
+        this.intakePartition = intakePartition;
+    }
+
+    @Override
+    public String toString() {
+        return messageType.name() + " " + connectionId + " [" + lastPersistedTupleIntakeTimestamp + "] ";
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public long getLastPersistedTupleIntakeTimestamp() {
+        return lastPersistedTupleIntakeTimestamp;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public boolean isPersistenceDelayWithinLimit() {
+        return persistenceDelayWithinLimit;
+    }
+
+    public void setPersistenceDelayWithinLimit(boolean persistenceDelayWithinLimit) {
+        this.persistenceDelayWithinLimit = persistenceDelayWithinLimit;
+    }
+
+    public long getAverageDelay() {
+        return averageDelay;
+    }
+
+    public void setAverageDelay(long averageDelay) {
+        this.averageDelay = averageDelay;
+    }
+
+    public int getIntakePartition() {
+        return intakePartition;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP, lastPersistedTupleIntakeTimestamp);
+        obj.put(MessageConstants.PERSISTENCE_DELAY_WITHIN_LIMIT, persistenceDelayWithinLimit);
+        obj.put(MessageConstants.AVERAGE_PERSISTENCE_DELAY, averageDelay);
+        obj.put(FeedConstants.MessageConstants.PARTITION, partition);
+        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+
+        return obj;
+    }
+
+    public static StorageReportFeedMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        int partition = obj.getInt(FeedConstants.MessageConstants.PARTITION);
+        long timestamp = obj.getLong(FeedConstants.MessageConstants.LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP);
+        boolean persistenceDelayWithinLimit = obj.getBoolean(MessageConstants.PERSISTENCE_DELAY_WITHIN_LIMIT);
+        long averageDelay = obj.getLong(MessageConstants.AVERAGE_PERSISTENCE_DELAY);
+        int intakePartition = obj.getInt(MessageConstants.INTAKE_PARTITION);
+        return new StorageReportFeedMessage(connectionId, partition, timestamp, persistenceDelayWithinLimit,
+                averageDelay, intakePartition);
+    }
+
+    public void reset(long lastPersistedTupleIntakeTimestamp, boolean delayWithinLimit, long averageDelay) {
+        this.lastPersistedTupleIntakeTimestamp = lastPersistedTupleIntakeTimestamp;
+        this.persistenceDelayWithinLimit = delayWithinLimit;
+        this.averageDelay = averageDelay;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java
new file mode 100644
index 0000000..4bb750b
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+/**
+ * A feed control message indicating the need to end the feed. This message is dispatched
+ * to all locations that host an operator involved in the feed pipeline.
+ */
+public class ThrottlingEnabledFeedMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+
+    private final FeedRuntimeId runtimeId;
+
+    public ThrottlingEnabledFeedMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId) {
+        super(MessageType.THROTTLING_ENABLED);
+        this.connectionId = connectionId;
+        this.runtimeId = runtimeId;
+    }
+
+    @Override
+    public String toString() {
+        return MessageType.END.name() + "  " + connectionId + " [" + runtimeId + "] ";
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
+        obj.put(FeedConstants.MessageConstants.OPERAND_ID, runtimeId.getOperandId());
+        obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
+        return obj;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public FeedRuntimeId getFeedRuntimeId() {
+        return runtimeId;
+    }
+
+    public static ThrottlingEnabledFeedMessage read(JSONObject obj) throws JSONException {
+        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+                obj.getString(FeedConstants.MessageConstants.FEED));
+        FeedConnectionId connectionId = new FeedConnectionId(feedId,
+                obj.getString(FeedConstants.MessageConstants.DATASET));
+        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
+                .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
+                obj.getInt(FeedConstants.MessageConstants.PARTITION),
+                obj.getString(FeedConstants.MessageConstants.OPERAND_ID));
+        return new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IAsterixTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IAsterixTupleParser.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IAsterixTupleParser.java
new file mode 100644
index 0000000..0a03d79
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IAsterixTupleParser.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.common.parse;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+
+public interface IAsterixTupleParser extends ITupleParser{
+
+    public void configure(Map<String, String> configuration);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleForwardPolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleForwardPolicy.java
new file mode 100644
index 0000000..e7fdc74
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleForwardPolicy.java
@@ -0,0 +1,30 @@
+package edu.uci.ics.asterix.common.parse;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface ITupleForwardPolicy {
+
+    public static final String PARSER_POLICY = "parser-policy";
+    
+    public enum TupleForwardPolicyType {
+        FRAME_FULL,
+        COUNTER_TIMER_EXPIRED,
+        RATE_CONTROLLED
+    }
+
+    public void configure(Map<String, String> configuration);
+
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
+
+    public TupleForwardPolicyType getType();
+
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
+
+    public void close() throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleParserPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleParserPolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleParserPolicy.java
new file mode 100644
index 0000000..652139c
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleParserPolicy.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.asterix.common.parse;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface ITupleParserPolicy {
+
+    public enum TupleParserPolicy {
+        FRAME_FULL,
+        TIME_COUNT_ELAPSED,
+        RATE_CONTROLLED
+    }
+
+    public TupleParserPolicy getType();
+
+    public void configure(Map<String, String> configuration) throws HyracksDataException;
+
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException;
+
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
+
+    public void close() throws HyracksDataException;
+}