You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:09 UTC
[12/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetricCollector.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetricCollector.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetricCollector.java
new file mode 100644
index 0000000..1680387
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedMetricCollector.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+
+public interface IFeedMetricCollector {
+
+ public enum ValueType {
+ CPU_USAGE,
+ INFLOW_RATE,
+ OUTFLOW_RATE
+ }
+
+ public enum MetricType {
+ AVG,
+ RATE
+ }
+
+ public boolean sendReport(int senderId, int value);
+
+ public int getMetric(int senderId);
+
+ public int getMetric(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType);
+
+ int createReportSender(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType,
+ MetricType metricType);
+
+ public void removeReportSender(int senderId);
+
+ public void resetReportSender(int senderId);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedOperatorOutputSideHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedOperatorOutputSideHandler.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedOperatorOutputSideHandler.java
new file mode 100644
index 0000000..d8c0b94
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedOperatorOutputSideHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+
+public interface IFeedOperatorOutputSideHandler extends IFrameWriter {
+
+ public enum Type {
+ BASIC_FEED_OUTPUT_HANDLER,
+ DISTRIBUTE_FEED_OUTPUT_HANDLER,
+ COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER
+ }
+
+ public FeedId getFeedId();
+
+ public Type getType();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedProvider.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedProvider.java
new file mode 100644
index 0000000..89dc33b
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedProvider.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedId;
+
+public interface IFeedProvider {
+
+ public void subscribeFeed(FeedId sourceDeedId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedRuntime.java
new file mode 100644
index 0000000..73336b7
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedRuntime.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeInputHandler;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+
+public interface IFeedRuntime {
+
+ public enum FeedRuntimeType {
+ INTAKE,
+ COLLECT,
+ COMPUTE_COLLECT,
+ COMPUTE,
+ STORE,
+ OTHER,
+ ETS,
+ JOIN
+ }
+
+ public enum Mode {
+ PROCESS,
+ SPILL,
+ PROCESS_SPILL,
+ DISCARD,
+ POST_SPILL_DISCARD,
+ PROCESS_BACKLOG,
+ STALL,
+ FAIL,
+ END
+ }
+
+ /**
+ * @return the unique runtime id associated with the feedRuntime
+ */
+ public FeedRuntimeId getRuntimeId();
+
+ /**
+ * @return the frame writer associated with the feed runtime.
+ */
+ public IFrameWriter getFeedFrameWriter();
+
+ public FeedRuntimeInputHandler getInputHandler();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedService.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedService.java
new file mode 100644
index 0000000..6e85eca
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedService.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+public interface IFeedService {
+
+ public void start() throws Exception;
+
+ public void stop();
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedSubscriptionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedSubscriptionManager.java
new file mode 100644
index 0000000..ea476f8
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedSubscriptionManager.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+
+public interface IFeedSubscriptionManager {
+
+ /**
+ * @param subscribableRuntime
+ */
+ public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime);
+
+ /**
+ * @param subscribableRuntimeId
+ */
+ public void deregisterFeedSubscribableRuntime(SubscribableFeedRuntimeId subscribableRuntimeId);
+
+ /**
+ * @param subscribableRuntimeId
+ * @return
+ */
+ public ISubscribableRuntime getSubscribableRuntime(SubscribableFeedRuntimeId subscribableRuntimeId);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedTrackingManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedTrackingManager.java
new file mode 100644
index 0000000..958bf23
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedTrackingManager.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitAckMessage;
+
+public interface IFeedTrackingManager {
+
+ public void submitAckReport(FeedTupleCommitAckMessage ackMessage);
+
+ public void disableAcking(FeedConnectionId connectionId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWork.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWork.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWork.java
new file mode 100644
index 0000000..8e340b5
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWork.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+/**
+ * Represents a feed management task. The task is executed asynchronously.
+ */
+public interface IFeedWork {
+
+ public Runnable getRunnable();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkEventListener.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkEventListener.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkEventListener.java
new file mode 100644
index 0000000..4b989da
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkEventListener.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+/**
+ * Provides a callback mechanism that in invoked for events related to
+ * the execution of a feed management task.
+ */
+public interface IFeedWorkEventListener {
+
+ /**
+ * A call back that is invoked after successful completion of a feed
+ * management task.
+ */
+ public void workCompleted(IFeedWork work);
+
+ /**
+ * A call back that is invokved after a failed execution of a feed
+ * management task.
+ *
+ * @param e
+ * exception encountered during execution of the task.
+ */
+ public void workFailed(IFeedWork work, Exception e);
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkManager.java
new file mode 100644
index 0000000..58ea396
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFeedWorkManager.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+public interface IFeedWorkManager {
+
+ public void submitWork(IFeedWork work, IFeedWorkEventListener listener);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFrameEventCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFrameEventCallback.java
new file mode 100644
index 0000000..0a1c1fe
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IFrameEventCallback.java
@@ -0,0 +1,14 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+public interface IFrameEventCallback {
+
+ public enum FrameEvent {
+ FINISHED_PROCESSING,
+ PENDING_WORK_THRESHOLD_REACHED,
+ PENDING_WORK_DONE,
+ NO_OP,
+ FINISHED_PROCESSING_SPILLAGE
+ }
+
+ public void frameEvent(FrameEvent frameEvent);
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IIntakeProgressTracker.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IIntakeProgressTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IIntakeProgressTracker.java
new file mode 100644
index 0000000..88f80e4
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IIntakeProgressTracker.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.util.Map;
+
+public interface IIntakeProgressTracker {
+
+ public void configure(Map<String, String> configuration);
+
+ public void notifyIngestedTupleTimestamp(long timestamp);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IMessageReceiver.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IMessageReceiver.java
new file mode 100644
index 0000000..4d986ab
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/IMessageReceiver.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+public interface IMessageReceiver<T> {
+
+ public void sendMessage(T message);
+
+ public void close(boolean processPending);
+
+ public void start();
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscribableRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscribableRuntime.java
new file mode 100644
index 0000000..3b44cbb
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscribableRuntime.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.feeds.CollectionRuntime;
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+/**
+ * Represent a feed runtime whose output can be routed along other parallel path(s).
+ */
+public interface ISubscribableRuntime extends IFeedRuntime {
+
+ /**
+ * @param collectionRuntime
+ * @throws Exception
+ */
+ public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception;
+
+ /**
+ * @param collectionRuntime
+ * @throws Exception
+ */
+ public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception;
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ public List<ISubscriberRuntime> getSubscribers();
+
+ /**
+ * @return
+ */
+ public DistributeFeedFrameWriter getFeedFrameWriter();
+
+ /**
+ * @return
+ */
+ public RecordDescriptor getRecordDescriptor();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriberRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriberRuntime.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriberRuntime.java
new file mode 100644
index 0000000..abf913c
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriberRuntime.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.api;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.FeedFrameCollector;
+
+public interface ISubscriberRuntime {
+
+ public Map<String, String> getFeedPolicy();
+
+ public FeedFrameCollector getFrameCollector();
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriptionProvider.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriptionProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriptionProvider.java
new file mode 100644
index 0000000..76593ea
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ISubscriptionProvider.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+import edu.uci.ics.asterix.common.feeds.FeedId;
+
+public interface ISubscriptionProvider {
+
+ public void subscribeFeed(FeedId sourceFeedId, FeedId recipientFeedId);
+
+ public void unsubscribeFeed(FeedId sourceFeedId, FeedId recipientFeedId);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
new file mode 100644
index 0000000..485b9f0
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/api/ITupleTrackingFeedAdapter.java
@@ -0,0 +1,6 @@
+package edu.uci.ics.asterix.common.feeds.api;
+
+public interface ITupleTrackingFeedAdapter extends IFeedAdapter {
+
+ public void tuplePersistedTimeCallback(long timestamp);
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/EndFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/EndFeedMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/EndFeedMessage.java
new file mode 100644
index 0000000..4d08ea3
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/EndFeedMessage.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+/**
+ * A feed control message indicating the need to end the feed. This message is dispatched
+ * to all locations that host an operator involved in the feed pipeline.
+ */
+public class EndFeedMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedId sourceFeedId;
+
+ private final FeedConnectionId connectionId;
+
+ private final FeedRuntimeType sourceRuntimeType;
+
+ private final boolean completeDisconnection;
+
+ private final EndMessageType endMessageType;
+
+ public enum EndMessageType {
+ DISCONNECT_FEED,
+ DISCONTINUE_SOURCE
+ }
+
+ public EndFeedMessage(FeedConnectionId connectionId, FeedRuntimeType sourceRuntimeType, FeedId sourceFeedId,
+ boolean completeDisconnection, EndMessageType endMessageType) {
+ super(MessageType.END);
+ this.connectionId = connectionId;
+ this.sourceRuntimeType = sourceRuntimeType;
+ this.sourceFeedId = sourceFeedId;
+ this.completeDisconnection = completeDisconnection;
+ this.endMessageType = endMessageType;
+ }
+
+ @Override
+ public String toString() {
+ return MessageType.END.name() + " " + connectionId + " [" + sourceRuntimeType + "] ";
+ }
+
+ public FeedRuntimeType getSourceRuntimeType() {
+ return sourceRuntimeType;
+ }
+
+ public FeedId getSourceFeedId() {
+ return sourceFeedId;
+ }
+
+ public boolean isCompleteDisconnection() {
+ return completeDisconnection;
+ }
+
+ public EndMessageType getEndMessageType() {
+ return endMessageType;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ return obj;
+ }
+
+ public FeedConnectionId getFeedConnectionId() {
+ return connectionId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedCongestionMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedCongestionMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedCongestionMessage.java
new file mode 100644
index 0000000..16fb115
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedCongestionMessage.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedConstants.MessageConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.Mode;
+
+public class FeedCongestionMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+ private final FeedRuntimeId runtimeId;
+ private int inflowRate;
+ private int outflowRate;
+ private Mode mode;
+
+ public FeedCongestionMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId, int inflowRate,
+ int outflowRate, Mode mode) {
+ super(MessageType.CONGESTION);
+ this.connectionId = connectionId;
+ this.runtimeId = runtimeId;
+ this.inflowRate = inflowRate;
+ this.outflowRate = outflowRate;
+ this.mode = mode;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
+ obj.put(FeedConstants.MessageConstants.OPERAND_ID, runtimeId.getOperandId());
+ obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
+ obj.put(FeedConstants.MessageConstants.INFLOW_RATE, inflowRate);
+ obj.put(FeedConstants.MessageConstants.OUTFLOW_RATE, outflowRate);
+ obj.put(FeedConstants.MessageConstants.MODE, mode);
+ return obj;
+ }
+
+ public FeedRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
+ public int getInflowRate() {
+ return inflowRate;
+ }
+
+ public int getOutflowRate() {
+ return outflowRate;
+ }
+
+ public static FeedCongestionMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
+ .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
+ obj.getInt(FeedConstants.MessageConstants.PARTITION),
+ obj.getString(FeedConstants.MessageConstants.OPERAND_ID));
+ Mode mode = Mode.valueOf(obj.getString(MessageConstants.MODE));
+ return new FeedCongestionMessage(connectionId, runtimeId,
+ obj.getInt(FeedConstants.MessageConstants.INFLOW_RATE),
+ obj.getInt(FeedConstants.MessageConstants.OUTFLOW_RATE), mode);
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public Mode getMode() {
+ return mode;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessage.java
new file mode 100644
index 0000000..62e774c
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.hyracks.api.dataflow.value.JSONSerializable;
+
+/**
+ * A control message that can be sent to the runtime instance of a
+ * feed's adapter.
+ */
+public abstract class FeedMessage implements IFeedMessage, JSONSerializable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final MessageType messageType;
+
+ public FeedMessage(MessageType messageType) {
+ this.messageType = messageType;
+ }
+
+ public MessageType getMessageType() {
+ return messageType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessageService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessageService.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessageService.java
new file mode 100644
index 0000000..51a950b
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedMessageService.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessageService;
+
+/**
+ * Sends feed report messages on behalf of an operator instance
+ * to the SuperFeedManager associated with the feed.
+ */
+public class FeedMessageService implements IFeedMessageService {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
+
+ private final LinkedBlockingQueue<String> inbox;
+ private final FeedMessageHandler mesgHandler;
+ private final String nodeId;
+ private ExecutorService executor;
+
+ public FeedMessageService(AsterixFeedProperties feedProperties, String nodeId, String ccClusterIp) {
+ this.inbox = new LinkedBlockingQueue<String>();
+ this.mesgHandler = new FeedMessageHandler(inbox, ccClusterIp, feedProperties.getFeedCentralManagerPort());
+ this.nodeId = nodeId;
+ this.executor = Executors.newSingleThreadExecutor();
+ }
+
+ public void start() throws Exception {
+ executor.execute(mesgHandler);
+ }
+
+ public void stop() {
+ synchronized (mesgHandler.getLock()) {
+ executor.shutdownNow();
+ }
+ mesgHandler.stop();
+ }
+
+ @Override
+ public void sendMessage(IFeedMessage message) {
+ try {
+ JSONObject obj = message.toJSON();
+ obj.put(FeedConstants.MessageConstants.NODE_ID, nodeId);
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, message.getMessageType().name());
+ inbox.add(obj.toString());
+ } catch (JSONException jse) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("JSON Exception in parsing message " + message);
+ }
+ }
+ }
+
+ private static class FeedMessageHandler implements Runnable {
+
+ private final LinkedBlockingQueue<String> inbox;
+ private final String host;
+ private final int port;
+
+ private Socket cfmSocket;
+ private Object lock;
+
+ private static final byte[] EOL = "\n".getBytes();
+
+ public FeedMessageHandler(LinkedBlockingQueue<String> inbox, String host, int port) {
+ this.inbox = inbox;
+ this.host = host;
+ this.port = port;
+ this.lock = new Object();
+ }
+
+ public void run() {
+ try {
+ cfmSocket = new Socket(host, port);
+ if (cfmSocket != null) {
+ while (true) {
+ String message = inbox.take();
+ synchronized (lock) {
+ cfmSocket.getOutputStream().write(message.getBytes());
+ cfmSocket.getOutputStream().write(EOL);
+ }
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to start feed message service");
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
+ }
+ } finally {
+ stop();
+ }
+
+ }
+
+ public void stop() {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Stopping feed message handler");
+ }
+ if (cfmSocket != null) {
+ try {
+ cfmSocket.close();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in closing socket " + e.getMessage());
+ }
+ }
+ }
+ }
+
+ public Object getLock() {
+ return lock;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedReportMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedReportMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedReportMessage.java
new file mode 100644
index 0000000..128803a
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedReportMessage.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedConstants.MessageConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector.ValueType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+public class FeedReportMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+ private final FeedRuntimeId runtimeId;
+ private final ValueType valueType;
+ private int value;
+
+ public FeedReportMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType, int value) {
+ super(MessageType.FEED_REPORT);
+ this.connectionId = connectionId;
+ this.runtimeId = runtimeId;
+ this.valueType = valueType;
+ this.value = value;
+ }
+
+ public void reset(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
+ obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
+ obj.put(FeedConstants.MessageConstants.VALUE_TYPE, valueType);
+ obj.put(FeedConstants.MessageConstants.VALUE, value);
+ return obj;
+ }
+
+ public static FeedReportMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
+ .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
+ obj.getInt(FeedConstants.MessageConstants.PARTITION), FeedConstants.MessageConstants.NOT_APPLICABLE);
+ ValueType type = ValueType.valueOf(obj.getString(MessageConstants.VALUE_TYPE));
+ int value = Integer.parseInt(obj.getString(MessageConstants.VALUE));
+ return new FeedReportMessage(connectionId, runtimeId, type, value);
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public void setValue(int value) {
+ this.value = value;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public FeedRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
+ public ValueType getValueType() {
+ return valueType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitAckMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
new file mode 100644
index 0000000..cacfeb2
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitAckMessage.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+
+public class FeedTupleCommitAckMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+ private int intakePartition;
+ private int base;
+ private byte[] commitAcks;
+
+ public FeedTupleCommitAckMessage(FeedConnectionId connectionId, int intakePartition, int base, byte[] commitAcks) {
+ super(MessageType.COMMIT_ACK);
+ this.connectionId = connectionId;
+ this.intakePartition = intakePartition;
+ this.base = base;
+ this.commitAcks = commitAcks;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+ obj.put(FeedConstants.MessageConstants.BASE, base);
+ String commitAcksString = DatatypeConverter.printBase64Binary(commitAcks);
+ obj.put(FeedConstants.MessageConstants.COMMIT_ACKS, commitAcksString);
+ return obj;
+ }
+
+ public static FeedTupleCommitAckMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
+ int base = obj.getInt(FeedConstants.MessageConstants.BASE);
+ String commitAcksString = obj.getString(FeedConstants.MessageConstants.COMMIT_ACKS);
+ byte[] commitAcks = DatatypeConverter.parseBase64Binary(commitAcksString);
+ return new FeedTupleCommitAckMessage(connectionId, intakePartition, base, commitAcks);
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public int getIntakePartition() {
+ return intakePartition;
+ }
+
+ public byte[] getCommitAcks() {
+ return commitAcks;
+ }
+
+ public void reset(int intakePartition, int base, byte[] commitAcks) {
+ this.intakePartition = intakePartition;
+ this.base = base;
+ this.commitAcks = commitAcks;
+ }
+
+ public int getBase() {
+ return base;
+ }
+
+ public void setBase(int base) {
+ this.base = base;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
new file mode 100644
index 0000000..663a629
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/FeedTupleCommitResponseMessage.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+
+public class FeedTupleCommitResponseMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+ private final int intakePartition;
+ private final int maxWindowAcked;
+
+ public FeedTupleCommitResponseMessage(FeedConnectionId connectionId, int intakePartition, int maxWindowAcked) {
+ super(MessageType.COMMIT_ACK_RESPONSE);
+ this.connectionId = connectionId;
+ this.intakePartition = intakePartition;
+ this.maxWindowAcked = maxWindowAcked;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+ obj.put(FeedConstants.MessageConstants.MAX_WINDOW_ACKED, maxWindowAcked);
+ return obj;
+ }
+
+ public static FeedTupleCommitResponseMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
+ int maxWindowAcked = obj.getInt(FeedConstants.MessageConstants.MAX_WINDOW_ACKED);
+ return new FeedTupleCommitResponseMessage(connectionId, intakePartition, maxWindowAcked);
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public int getMaxWindowAcked() {
+ return maxWindowAcked;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/NodeReportMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/NodeReportMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/NodeReportMessage.java
new file mode 100644
index 0000000..32c35f3
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/NodeReportMessage.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+
+public class NodeReportMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private double cpuLoad;
+ private double usedHeap;
+ private int nRuntimes;
+
+ public NodeReportMessage(float cpuLoad, long usedHeap, int nRuntimes) {
+ super(IFeedMessage.MessageType.NODE_REPORT);
+ this.usedHeap = usedHeap;
+ this.cpuLoad = cpuLoad;
+ this.nRuntimes = nRuntimes;
+ }
+
+ public void reset(double cpuLoad, double usedHeap, int nRuntimes) {
+ this.cpuLoad = cpuLoad;
+ this.usedHeap = usedHeap;
+ this.nRuntimes = nRuntimes;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.CPU_LOAD, cpuLoad);
+ obj.put(FeedConstants.MessageConstants.HEAP_USAGE, usedHeap);
+ obj.put(FeedConstants.MessageConstants.N_RUNTIMES, nRuntimes);
+ return obj;
+ }
+
+ public double getCpuLoad() {
+ return cpuLoad;
+ }
+
+ public double getUsedHeap() {
+ return usedHeap;
+ }
+
+ public int getnRuntimes() {
+ return nRuntimes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ScaleInReportMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ScaleInReportMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ScaleInReportMessage.java
new file mode 100644
index 0000000..7acc20f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ScaleInReportMessage.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+/**
+ * A feed control message indicating the need to scale in a stage of the feed ingestion pipeline.
+ * Currently, scaling-in of the compute stage is supported.
+ **/
+public class ScaleInReportMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+
+ private final FeedRuntimeType runtimeType;
+
+ private int currentCardinality;
+
+ private int reducedCardinaliy;
+
+ public ScaleInReportMessage(FeedConnectionId connectionId, FeedRuntimeType runtimeType, int currentCardinality,
+ int reducedCardinaliy) {
+ super(MessageType.SCALE_IN_REQUEST);
+ this.connectionId = connectionId;
+ this.runtimeType = runtimeType;
+ this.currentCardinality = currentCardinality;
+ this.reducedCardinaliy = reducedCardinaliy;
+ }
+
+ @Override
+ public String toString() {
+ return MessageType.SCALE_IN_REQUEST.name() + " " + connectionId + " [" + runtimeType + "] "
+ + " currentCardinality " + currentCardinality + " reducedCardinality " + reducedCardinaliy;
+ }
+
+ public FeedRuntimeType getRuntimeType() {
+ return runtimeType;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeType);
+ obj.put(FeedConstants.MessageConstants.CURRENT_CARDINALITY, currentCardinality);
+ obj.put(FeedConstants.MessageConstants.REDUCED_CARDINALITY, reducedCardinaliy);
+ return obj;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public static ScaleInReportMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ FeedRuntimeType runtimeType = FeedRuntimeType.valueOf(obj
+ .getString(FeedConstants.MessageConstants.RUNTIME_TYPE));
+ return new ScaleInReportMessage(connectionId, runtimeType,
+ obj.getInt(FeedConstants.MessageConstants.CURRENT_CARDINALITY),
+ obj.getInt(FeedConstants.MessageConstants.REDUCED_CARDINALITY));
+ }
+
+ public void reset(int currentCardinality, int reducedCardinaliy) {
+ this.currentCardinality = currentCardinality;
+ this.reducedCardinaliy = reducedCardinaliy;
+ }
+
+ public int getCurrentCardinality() {
+ return currentCardinality;
+ }
+
+ public void setCurrentCardinality(int currentCardinality) {
+ this.currentCardinality = currentCardinality;
+ }
+
+ public int getReducedCardinaliy() {
+ return reducedCardinaliy;
+ }
+
+ public void setReducedCardinaliy(int reducedCardinaliy) {
+ this.reducedCardinaliy = reducedCardinaliy;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/StorageReportFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/StorageReportFeedMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/StorageReportFeedMessage.java
new file mode 100644
index 0000000..502eb98
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/StorageReportFeedMessage.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedConstants.MessageConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+
+/**
+ * A feed control message sent from a storage runtime of a feed pipeline to report the intake timestamp corresponding
+ * to the last persisted tuple.
+ */
+public class StorageReportFeedMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+ private final int partition;
+ private long lastPersistedTupleIntakeTimestamp;
+ private boolean persistenceDelayWithinLimit;
+ private long averageDelay;
+ private int intakePartition;
+
+ public StorageReportFeedMessage(FeedConnectionId connectionId, int partition,
+ long lastPersistedTupleIntakeTimestamp, boolean persistenceDelayWithinLimit, long averageDelay,
+ int intakePartition) {
+ super(MessageType.STORAGE_REPORT);
+ this.connectionId = connectionId;
+ this.partition = partition;
+ this.lastPersistedTupleIntakeTimestamp = lastPersistedTupleIntakeTimestamp;
+ this.persistenceDelayWithinLimit = persistenceDelayWithinLimit;
+ this.averageDelay = averageDelay;
+ this.intakePartition = intakePartition;
+ }
+
+ @Override
+ public String toString() {
+ return messageType.name() + " " + connectionId + " [" + lastPersistedTupleIntakeTimestamp + "] ";
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public long getLastPersistedTupleIntakeTimestamp() {
+ return lastPersistedTupleIntakeTimestamp;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public boolean isPersistenceDelayWithinLimit() {
+ return persistenceDelayWithinLimit;
+ }
+
+ public void setPersistenceDelayWithinLimit(boolean persistenceDelayWithinLimit) {
+ this.persistenceDelayWithinLimit = persistenceDelayWithinLimit;
+ }
+
+ public long getAverageDelay() {
+ return averageDelay;
+ }
+
+ public void setAverageDelay(long averageDelay) {
+ this.averageDelay = averageDelay;
+ }
+
+ public int getIntakePartition() {
+ return intakePartition;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP, lastPersistedTupleIntakeTimestamp);
+ obj.put(MessageConstants.PERSISTENCE_DELAY_WITHIN_LIMIT, persistenceDelayWithinLimit);
+ obj.put(MessageConstants.AVERAGE_PERSISTENCE_DELAY, averageDelay);
+ obj.put(FeedConstants.MessageConstants.PARTITION, partition);
+ obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
+
+ return obj;
+ }
+
+ public static StorageReportFeedMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ int partition = obj.getInt(FeedConstants.MessageConstants.PARTITION);
+ long timestamp = obj.getLong(FeedConstants.MessageConstants.LAST_PERSISTED_TUPLE_INTAKE_TIMESTAMP);
+ boolean persistenceDelayWithinLimit = obj.getBoolean(MessageConstants.PERSISTENCE_DELAY_WITHIN_LIMIT);
+ long averageDelay = obj.getLong(MessageConstants.AVERAGE_PERSISTENCE_DELAY);
+ int intakePartition = obj.getInt(MessageConstants.INTAKE_PARTITION);
+ return new StorageReportFeedMessage(connectionId, partition, timestamp, persistenceDelayWithinLimit,
+ averageDelay, intakePartition);
+ }
+
+ public void reset(long lastPersistedTupleIntakeTimestamp, boolean delayWithinLimit, long averageDelay) {
+ this.lastPersistedTupleIntakeTimestamp = lastPersistedTupleIntakeTimestamp;
+ this.persistenceDelayWithinLimit = delayWithinLimit;
+ this.averageDelay = averageDelay;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java
new file mode 100644
index 0000000..4bb750b
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/feeds/message/ThrottlingEnabledFeedMessage.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.feeds.message;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+
+/**
+ * A feed control message indicating the need to end the feed. This message is dispatched
+ * to all locations that host an operator involved in the feed pipeline.
+ */
+public class ThrottlingEnabledFeedMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+
+ private final FeedRuntimeId runtimeId;
+
+ public ThrottlingEnabledFeedMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId) {
+ super(MessageType.THROTTLING_ENABLED);
+ this.connectionId = connectionId;
+ this.runtimeId = runtimeId;
+ }
+
+ @Override
+ public String toString() {
+ return MessageType.END.name() + " " + connectionId + " [" + runtimeId + "] ";
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
+ obj.put(FeedConstants.MessageConstants.OPERAND_ID, runtimeId.getOperandId());
+ obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
+ return obj;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public FeedRuntimeId getFeedRuntimeId() {
+ return runtimeId;
+ }
+
+ public static ThrottlingEnabledFeedMessage read(JSONObject obj) throws JSONException {
+ FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
+ obj.getString(FeedConstants.MessageConstants.FEED));
+ FeedConnectionId connectionId = new FeedConnectionId(feedId,
+ obj.getString(FeedConstants.MessageConstants.DATASET));
+ FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
+ .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
+ obj.getInt(FeedConstants.MessageConstants.PARTITION),
+ obj.getString(FeedConstants.MessageConstants.OPERAND_ID));
+ return new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IAsterixTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IAsterixTupleParser.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IAsterixTupleParser.java
new file mode 100644
index 0000000..0a03d79
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IAsterixTupleParser.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.common.parse;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+
+public interface IAsterixTupleParser extends ITupleParser{
+
+ public void configure(Map<String, String> configuration);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleForwardPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleForwardPolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleForwardPolicy.java
new file mode 100644
index 0000000..e7fdc74
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleForwardPolicy.java
@@ -0,0 +1,30 @@
+package edu.uci.ics.asterix.common.parse;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface ITupleForwardPolicy {
+
+ public static final String PARSER_POLICY = "parser-policy";
+
+ public enum TupleForwardPolicyType {
+ FRAME_FULL,
+ COUNTER_TIMER_EXPIRED,
+ RATE_CONTROLLED
+ }
+
+ public void configure(Map<String, String> configuration);
+
+ public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
+
+ public TupleForwardPolicyType getType();
+
+ public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
+
+ public void close() throws HyracksDataException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleParserPolicy.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleParserPolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleParserPolicy.java
new file mode 100644
index 0000000..652139c
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/ITupleParserPolicy.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.asterix.common.parse;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface ITupleParserPolicy {
+
+ public enum TupleParserPolicy {
+ FRAME_FULL,
+ TIME_COUNT_ELAPSED,
+ RATE_CONTROLLED
+ }
+
+ public TupleParserPolicy getType();
+
+ public void configure(Map<String, String> configuration) throws HyracksDataException;
+
+ public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException;
+
+ public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
+
+ public void close() throws HyracksDataException;
+}