You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:14 UTC

[17/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
new file mode 100644
index 0000000..9ea9b47
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
@@ -0,0 +1,42 @@
+/*
+ * Description  : Create a feed using the synthetic feed simulator adapter. 
+                  Create a dataset that has an associated ngram index.
+                  The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed. 
+                  The feed lasts a configured duration with data arriving at a configured rate (tweets per second). 
+                  Verify the existence of data after the feed finishes.
+ * Issue        : 711                 
+ * Expected Res : Success
+ * Date         : 8th Feb 2014
+ */
+
+drop dataverse feeds_09 if exists;
+create dataverse feeds_09;
+use dataverse feeds_09;
+
+create type TwitterUserType as closed {
+	screen_name: string,
+	language: string,
+	friends_count: int32,
+	status_count: int32,
+	name: string,
+	followers_count: int32
+} 
+
+create type TweetMessageType as closed {
+    id: int64,
+    user: TwitterUserType,
+    latitude: double,
+    longitude: double,
+    message_text: string,
+    created_at: string,
+    country: string
+}
+
+create dataset SyntheticTweets(TweetMessageType)
+primary key id;
+
+create index message_text on SyntheticTweets(message_text) type btree;
+
+create feed  SyntheticTweetFeed
+using twitter_firehose
+(("duration"="5"),("tps"="50"),("tput-duration"="5"),("type-name"="TweetMessageType"),("dataverse-dataset"="feeds:SyntheticTweets"),("mode"="controlled"));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.2.update.aql
new file mode 100644
index 0000000..951f571
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.2.update.aql
@@ -0,0 +1,16 @@
+/*
+ * Description  : Create a feed using the synthetic feed simulator adapter.                   
+                  Create a dataset that has an associated ngram index.
+                  The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed.
+                  The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+                  Verify the existence of data after the feed finishes.
+ * Issue        : 711
+ * Expected Res : Success
+ * Date         : 2nd April 2014
+ */
+  
+use dataverse feeds_09;
+
+set wait-for-completion-feed "true";
+
+connect feed SyntheticTweetFeed to dataset SyntheticTweets;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.3.query.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.3.query.aql
new file mode 100644
index 0000000..693f016
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.3.query.aql
@@ -0,0 +1,22 @@
+/*
+ * Description  : Create a feed using the synthetic feed simulator adapter.                   
+                  Create a dataset that has an associated ngram index.
+                  The synthetic feed simulator uses the Social-Data generator to generate data and simulate a feed.
+                  The feed lasts a configured duration with data arriving at a configured rate (tweets per second).
+                  Verify the existence of data after the feed finishes.
+ * Issue        : 711
+ * Expected Res : Success
+ * Date         : 2nd Feb 2014
+ */
+
+use dataverse feeds_09;
+
+let $totalTweets:=count(
+for $x in dataset('SyntheticTweets')
+return $x)
+return 
+(if($totalTweets > 0)
+  then 1
+else
+  0
+)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.1.ddl.aql
new file mode 100644
index 0000000..13252a9
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.1.ddl.aql
@@ -0,0 +1,30 @@
+/*
+ * Description  : Create a dataset with a secondary btree index.
+                  Create a feed that uses the file_feed adapter.
+                  The file_feed adapter simulates a feed from a file in the HDFS.
+                  Connect the feed to the dataset and verify contents of the dataset post completion.
+ * Issue        : 711
+ * Expected Res : Success
+ * Date         : 6th Feb 2014
+ */
+
+drop dataverse feeds_10 if exists;
+create dataverse feeds_10;
+use dataverse feeds_10;
+
+create type TweetType as closed {
+  id: string,
+  username : string,
+  location : string,
+  text : string,
+  timestamp : string
+}      
+
+create dataset Tweets(TweetType)
+primary key id;
+
+create index usernameIdx on Tweets(username) type btree;
+
+create feed TweetFeed
+using file_feed
+(("fs"="localfs"),("path"="nc1://data/twitter/obamatweets.adm"),("format"="adm"),("type-name"="TweetType"),("tuple-interval"="10"));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.2.update.aql
new file mode 100644
index 0000000..a99d234
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.2.update.aql
@@ -0,0 +1,15 @@
+/*
+ * Description  : Create a dataset with a secondary btree index.
+                  Create a feed that uses the file_feed adapter.
+                  The file_feed adapter simulates a feed from a file in the HDFS.
+                  Connect the feed to the dataset and verify contents of the dataset post completion.
+ * Issue        : 711
+ * Expected Res : Success
+ * Date         : 6th Feb 2014
+ */
+  
+use dataverse feeds_10;
+
+set wait-for-completion-feed "true";
+
+connect feed TweetFeed to dataset Tweets;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.3.query.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.3.query.aql
new file mode 100644
index 0000000..b7fa39a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_10/feeds_10.3.query.aql
@@ -0,0 +1,16 @@
+/*
+ * Description  : Create a dataset with a secondary btree index.
+                  Create a feed that uses the file_feed adapter.
+                  The file_feed adapter simulates a feed from a file in the HDFS.
+                  Connect the feed to the dataset and verify contents of the dataset post completion.
+ * Issue        : 711
+ * Expected Res : Success
+ * Date         : 6th Feb 2014
+ */
+use dataverse feeds_10;
+
+count(
+for $x in dataset('Tweets')
+order by $x.id
+return $x
+)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.1.ddl.aql
new file mode 100644
index 0000000..d5a1c92
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.1.ddl.aql
@@ -0,0 +1,27 @@
+/*
+ * Description  : Create a feed dataset that uses the feed simulator adapter. 
+                  Begin ingestion using a fully qualified name and verify contents of the dataset post completion.  
+ * Expected Res : Success
+ * Date         : 24th Dec 2012
+ */
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+
+create type TweetType as closed {
+  id: string,
+  username : string,
+  location : string,
+  text : string,
+  timestamp : string
+}      
+
+create dataset Tweets(TweetType)
+primary key id;
+
+create feed TweetFeed
+using file_feed
+(("fs"="localfs"),("path"="nc1://data/twitter/obamatweets.adm"),("format"="adm"),("type-name"="TweetType"),("tuple-interval"="10"));
+
+

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.2.update.aql
new file mode 100644
index 0000000..fc71769
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.2.update.aql
@@ -0,0 +1,12 @@
+/*
+ * Description  : Create a feed dataset that uses the feed simulator adapter. 
+                  Begin ingestion using a fully qualified name and verify contents of the dataset post completion.  
+ * Expected Res : Success
+ * Date         : 24th Dec 2012
+ */
+
+use dataverse feeds;
+
+set wait-for-completion-feed "true";
+ 
+connect feed feeds.TweetFeed to dataset Tweets;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.3.query.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.3.query.aql
new file mode 100644
index 0000000..227913d
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_11/feeds_11.3.query.aql
@@ -0,0 +1,11 @@
+/*
+ * Description  : Create a feed dataset that uses the feed simulator adapter. 
+                  Begin ingestion using a fully qualified name and verify contents of the dataset post completion.  
+ * Expected Res : Success
+ * Date         : 24th Dec 2012
+ */
+use dataverse feeds;
+
+for $x in dataset('Tweets')
+order by $x.id
+return $x

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.1.ddl.aql
new file mode 100644
index 0000000..431973b
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.1.ddl.aql
@@ -0,0 +1,30 @@
+/*
+ * Description  : Create a feed from the contents of  a file using a file_feed adaptor. 
+                  The contents here contain a duplicate record. Since default ingestion policy requires the feed
+                  to recover from failures, feed ingestion should be able to surpaas the tuple with duplicate key
+                  without an abort. 
+                  Verify that all but the duplicate record are inserted into the target dataset.
+ * Expected Res : Success
+ * Date         : 3rd Apr 2014
+ */
+drop dataverse feeds_12 if exists;
+create dataverse feeds_12;
+use dataverse feeds_12;
+
+
+create type TweetType as closed {
+  id: string,
+  username : string,
+  location : string,
+  text : string,
+  timestamp : string
+}      
+
+create dataset Tweets(TweetType)
+primary key id;
+
+create feed TweetFeed
+using file_feed
+(("fs"="localfs"),("path"="nc1://data/twitter/obamatweets_duplicate.adm"),("format"="adm"),("type-name"="TweetType"),("tuple-interval"="10"));
+
+

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.2.update.aql
new file mode 100644
index 0000000..8a127ed
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.2.update.aql
@@ -0,0 +1,12 @@
+/*
+ * Description  : Create a feed dataset that uses the feed simulator adapter. 
+                  Begin ingestion using a fully qualified name and verify contents of the dataset post completion.  
+ * Expected Res : Success
+ * Date         : 24th Dec 2012
+ */
+
+use dataverse feeds_12;
+
+set wait-for-completion-feed "true";
+ 
+connect feed TweetFeed to dataset Tweets;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.3.query.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.3.query.aql
new file mode 100644
index 0000000..387f2ac
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_12/feeds_12.3.query.aql
@@ -0,0 +1,11 @@
+/*
+ * Description  : Create a feed dataset that uses the feed simulator adapter. 
+                  Begin ingestion using a fully qualified name and verify contents of the dataset post completion.  
+ * Expected Res : Success
+ * Date         : 24th Dec 2012
+ */
+use dataverse feeds_12;
+
+for $x in dataset('Tweets')
+order by $x.id
+return $x

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
index 896a250..1306499 100644
--- a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
@@ -1,2 +1,2 @@
-[ { "DataverseName": "feeds", "FeedName": "TweetFeed", "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }}, "Function": null, "Timestamp": "Tue Sep 24 22:30:47 PDT 2013" }
+[ { "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": null, "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "SecondaryTypeDetails": null, "Timestamp": "Sat Jun 20 13:55:58 PDT 2015" }
  ]

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
index 9dfeb95..177a0f1 100644
--- a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
@@ -1,2 +1,2 @@
-[ { "DataverseName": "feeds", "FeedName": "TweetFeed", "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }}, "Function": "feeds.feed_processor@1", "Timestamp": "Tue Sep 24 22:35:03 PDT 2013" }
+[ { "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": "feed_processor", "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "SecondaryTypeDetails": null, "Timestamp": "Sat Jun 20 13:55:59 PDT 2015" }
  ]

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/results/feeds/feeds_09/feeds_09.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_09/feeds_09.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_09/feeds_09.1.adm
new file mode 100644
index 0000000..3c13d5f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_09/feeds_09.1.adm
@@ -0,0 +1,2 @@
+[ 1
+ ]

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/results/feeds/feeds_10/feeds_10.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_10/feeds_10.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_10/feeds_10.1.adm
new file mode 100644
index 0000000..3904598
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_10/feeds_10.1.adm
@@ -0,0 +1,2 @@
+[ 12
+ ]

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/results/feeds/feeds_11/feeds_11.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_11/feeds_11.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_11/feeds_11.1.adm
new file mode 100644
index 0000000..00ec0ac
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_11/feeds_11.1.adm
@@ -0,0 +1,13 @@
+[ { "id": "nc1:1", "username": "BronsonMike", "location": "", "text": "@GottaLaff @reutersus Christie and obama just foul weather friends", "timestamp": "Thu Dec 06 16:53:06 PST 2012" }
+, { "id": "nc1:100", "username": "KidrauhlProuds", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson  uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
+, { "id": "nc1:102", "username": "jaysauce82", "location": "", "text": "Not voting for President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
+, { "id": "nc1:104", "username": "princeofsupras", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson e uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:15 PST 2012" }
+, { "id": "nc1:106", "username": "GulfDogs", "location": "", "text": "Obama Admin Knew Libyan Terrorists Had US-Provided Weaponsteaparty #tcot #ccot #NewGuards #BreitbartArmy #patriotwttp://t.co/vJxzrQUE", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
+, { "id": "nc1:108", "username": "Laugzpz", "location": "", "text": "@AlfredoJalife Maestro Obama se hace de la vista gorda, es un acuerdo de siempre creo yo.", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
+, { "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+, { "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+, { "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" }
+, { "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+, { "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" }
+, { "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT @Newitrsdotcom: I hope #Obama will win re-election... Other four years without meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }
+ ]

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/results/feeds/feeds_12/feeds_12.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feeds_12/feeds_12.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_12/feeds_12.1.adm
new file mode 100644
index 0000000..4905871
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feeds_12/feeds_12.1.adm
@@ -0,0 +1,12 @@
+[ { "id": "nc1:1", "username": "BronsonMike", "location": "", "text": "@GottaLaff @reutersus Christie and obama just foul weather friends", "timestamp": "Thu Dec 06 16:53:06 PST 2012" }
+, { "id": "nc1:100", "username": "KidrauhlProuds", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson  uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
+, { "id": "nc1:102", "username": "jaysauce82", "location": "", "text": "Not voting for President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
+, { "id": "nc1:104", "username": "princeofsupras", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson e uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:15 PST 2012" }
+, { "id": "nc1:106", "username": "GulfDogs", "location": "", "text": "Obama Admin Knew Libyan Terrorists Had US-Provided Weaponsteaparty #tcot #ccot #NewGuards #BreitbartArmy #patriotwttp://t.co/vJxzrQUE", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
+, { "id": "nc1:108", "username": "Laugzpz", "location": "", "text": "@AlfredoJalife Maestro Obama se hace de la vista gorda, es un acuerdo de siempre creo yo.", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
+, { "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+, { "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+, { "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" }
+, { "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+, { "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" }
+ ]

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 575827e..59e76e0 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -6085,29 +6085,47 @@
                 <output-dir compare="Text">feeds_04</output-dir>
             </compilation-unit>
         </test-case>
-        <test-case FilePath="feeds">
-            <compilation-unit name="feeds_05">
-                <output-dir compare="Text">feeds_05</output-dir>
-            </compilation-unit>
-        </test-case>
+        
         <!--Disable it because of sporadic failures. Raman will re-enable it.
         <test-case FilePath="feeds">
           <compilation-unit name="feeds_06">
             <output-dir compare="Text">feeds_06</output-dir>
           </compilation-unit>
         </test-case>
-        -->
         <test-case FilePath="feeds">
             <compilation-unit name="feeds_07">
                 <output-dir compare="Text">feeds_07</output-dir>
             </compilation-unit>
         </test-case>
+        -->
         <test-case FilePath="feeds">
             <compilation-unit name="feeds_08">
                 <output-dir compare="Text">feeds_08</output-dir>
             </compilation-unit>
         </test-case>
         <test-case FilePath="feeds">
+            <compilation-unit name="feeds_09">
+                <output-dir compare="Text">feeds_09</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_10">
+                <output-dir compare="Text">feeds_10</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_11">
+                <output-dir compare="Text">feeds_11</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_12">
+                <output-dir compare="Text">feeds_12</output-dir>
+            </compilation-unit>
+        </test-case>
+        
+        
+        <test-case FilePath="feeds">
             <compilation-unit name="issue_230_feeds">
                 <output-dir compare="Text">issue_230_feeds</output-dir>
             </compilation-unit>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
index abac9df..b9c96d1 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
@@ -37,10 +37,14 @@ public interface Statement extends IAqlExpression {
         INDEX_DECL,
         CREATE_DATAVERSE,
         INDEX_DROP,
-        CREATE_FEED,
+        CREATE_PRIMARY_FEED,
+        CREATE_SECONDARY_FEED,
         DROP_FEED,
         CONNECT_FEED,
         DISCONNECT_FEED,
+        SUBSCRIBE_FEED,
+        CREATE_FEED_POLICY,
+        DROP_FEED_POLICY,
         CREATE_FUNCTION,
         FUNCTION_DROP,
         COMPACT, 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
index 25acff8..5d9794e 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
@@ -32,9 +32,10 @@ import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import edu.uci.ics.asterix.metadata.entities.Feed;
 import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
@@ -75,6 +76,7 @@ public class ConnectFeedStatement implements Statement {
         this.varCounter = varCounter;
     }
 
+    /*
     public void initialize(MetadataTransactionContext mdTxnCtx, Dataset targetDataset, Feed sourceFeed)
             throws MetadataException {
         query = new Query();
@@ -91,9 +93,9 @@ public class ConnectFeedStatement implements Statement {
             }
         }
 
-        Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+        Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
         try {
-            factoryOutput = FeedUtil.getFeedFactoryAndOutput(sourceFeed, mdTxnCtx);
+            factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) sourceFeed, mdTxnCtx);
             adapterOutputType = factoryOutput.second.getTypeName();
         } catch (AlgebricksException ae) {
             ae.printStackTrace();
@@ -135,7 +137,7 @@ public class ConnectFeedStatement implements Statement {
             throw new MetadataException(pe);
         }
 
-    }
+    }*/
 
     public Identifier getDataverseName() {
         return dataverseName;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedPolicyStatement.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedPolicyStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedPolicyStatement.java
new file mode 100644
index 0000000..939d777
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedPolicyStatement.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class CreateFeedPolicyStatement implements Statement {
+
+    private final String policyName;
+    private final String sourcePolicyName;
+    private final Map<String, String> properties;
+    private final String sourcePolicyFile;
+    private final String description;
+    private final boolean ifNotExists;
+
+    public CreateFeedPolicyStatement(String policyName, String sourcePolicyName, Map<String, String> properties,
+            String description, boolean ifNotExists) {
+        this.policyName = policyName;
+        this.sourcePolicyName = sourcePolicyName;
+        this.properties = properties;
+        this.description = description;
+        this.ifNotExists = ifNotExists;
+        sourcePolicyFile = null;
+    }
+
+    public CreateFeedPolicyStatement(String policyName, String sourcePolicyFile, String description, boolean ifNotExists) {
+        this.policyName = policyName;
+        this.sourcePolicyName = null;
+        this.sourcePolicyFile = sourcePolicyFile;
+        this.description = description;
+        this.properties = null;
+        this.ifNotExists = ifNotExists;
+    }
+
+    public boolean getIfNotExists() {
+        return this.ifNotExists;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Statement.Kind.CREATE_FEED_POLICY;
+    }
+
+    @Override
+    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+        return visitor.visitCreateFeedPolicyStatement(this, arg);
+    }
+
+    @Override
+    public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
+        visitor.visit(this, arg);
+    }
+
+    public String getPolicyName() {
+        return policyName;
+    }
+
+    public String getSourcePolicyName() {
+        return sourcePolicyName;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public String getSourcePolicyFile() {
+        return sourcePolicyFile;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java
index 4e33f73..e189fd4 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java
@@ -14,47 +14,32 @@
  */
 package edu.uci.ics.asterix.aql.expression;
 
-import java.util.Map;
-
 import edu.uci.ics.asterix.aql.base.Statement;
 import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
 import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 
-public class CreateFeedStatement implements Statement {
+public abstract class CreateFeedStatement implements Statement {
 
-    private final Identifier dataverseName;
-    private final Identifier feedName;
-    private final String adapterName;
-    private final Map<String, String> adapterConfiguration;
+    private final Pair<Identifier, Identifier> qName;
     private final FunctionSignature appliedFunction;
     private final boolean ifNotExists;
 
-    public CreateFeedStatement(Identifier dataverseName, Identifier feedName, String adapterName,
-            Map<String, String> adapterConfiguration, FunctionSignature appliedFunction, boolean ifNotExists) {
-        this.feedName = feedName;
-        this.dataverseName = dataverseName;
-        this.adapterName = adapterName;
-        this.adapterConfiguration = adapterConfiguration;
+    public CreateFeedStatement(Pair<Identifier, Identifier> qName, FunctionSignature appliedFunction,
+            boolean ifNotExists) {
+        this.qName = qName;
         this.appliedFunction = appliedFunction;
         this.ifNotExists = ifNotExists;
     }
 
     public Identifier getDataverseName() {
-        return dataverseName;
+        return qName.first;
     }
 
     public Identifier getFeedName() {
-        return feedName;
-    }
-
-    public String getAdapterName() {
-        return adapterName;
-    }
-
-    public Map<String, String> getAdapterConfiguration() {
-        return adapterConfiguration;
+        return qName.second;
     }
 
     public FunctionSignature getAppliedFunction() {
@@ -66,14 +51,10 @@ public class CreateFeedStatement implements Statement {
     }
 
     @Override
-    public Kind getKind() {
-        return Kind.CREATE_FEED;
-    }
+    public abstract Kind getKind();
 
     @Override
-    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
-        return visitor.visitCreateFeedStatement(this, arg);
-    }
+    public abstract <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException;
 
     @Override
     public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreatePrimaryFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreatePrimaryFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreatePrimaryFeedStatement.java
new file mode 100644
index 0000000..810e508
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreatePrimaryFeedStatement.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+
+public class CreatePrimaryFeedStatement extends CreateFeedStatement implements Statement {
+
+    private final String adaptorName;
+    private final Map<String, String> adaptorConfiguration;
+
+    public CreatePrimaryFeedStatement(Pair<Identifier, Identifier> qName, String adaptorName,
+            Map<String, String> adaptorConfiguration, FunctionSignature appliedFunction, boolean ifNotExists) {
+        super(qName, appliedFunction, ifNotExists);
+        this.adaptorName = adaptorName;
+        this.adaptorConfiguration = adaptorConfiguration;
+    }
+
+    public String getAdaptorName() {
+        return adaptorName;
+    }
+
+    public Map<String, String> getAdaptorConfiguration() {
+        return adaptorConfiguration;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.CREATE_PRIMARY_FEED;
+    }
+
+    @Override
+    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+        return visitor.visitCreatePrimaryFeedStatement(this, arg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateSecondaryFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateSecondaryFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateSecondaryFeedStatement.java
new file mode 100644
index 0000000..7d5f72a
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateSecondaryFeedStatement.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+
+/**
+ * Represents the AQL statement for creating a secondary feed.
+ * A secondary feed is one that derives its data from another (primary/secondary) feed.
+ */
+public class CreateSecondaryFeedStatement extends CreateFeedStatement implements Statement {
+
+    /** The source feed that provides data for this secondary feed. */
+    private final Pair<Identifier, Identifier> sourceQName;
+
+    public CreateSecondaryFeedStatement(Pair<Identifier, Identifier> qName, Pair<Identifier, Identifier> sourceQName,
+            FunctionSignature appliedFunction, boolean ifNotExists) {
+        super(qName, appliedFunction, ifNotExists);
+        this.sourceQName = sourceQName;
+    }
+
+    public String getSourceFeedDataverse() {
+        return sourceQName.first != null ? sourceQName.first.toString()
+                : getDataverseName() != null ? getDataverseName().getValue() : null;
+    }
+
+    public String getSourceFeedName() {
+        return sourceQName.second != null ? sourceQName.second.toString() : null;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.CREATE_SECONDARY_FEED;
+    }
+
+    @Override
+    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+        return visitor.visitCreateSecondaryFeedStatement(this, arg);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/FeedPolicyDropStatement.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/FeedPolicyDropStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/FeedPolicyDropStatement.java
new file mode 100644
index 0000000..a9e9ee1
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/FeedPolicyDropStatement.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class FeedPolicyDropStatement implements Statement {
+
+    private final Identifier dataverseName;
+    private final Identifier policyName;
+    private boolean ifExists;
+
+    public FeedPolicyDropStatement(Identifier dataverseName, Identifier policyName, boolean ifExists) {
+        this.dataverseName = dataverseName;
+        this.policyName = policyName;
+        this.ifExists = ifExists;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.DROP_FEED_POLICY;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getPolicyName() {
+        return policyName;
+    }
+
+    public boolean getIfExists() {
+        return ifExists;
+    }
+
+    @Override
+    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+        return visitor.visitDropFeedPolicyStatement(this, arg);
+    }
+
+    @Override
+    public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
+        visitor.visit(this, arg);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/SubscribeFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/SubscribeFeedStatement.java
new file mode 100644
index 0000000..509a69a
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/SubscribeFeedStatement.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import java.io.StringReader;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
+import edu.uci.ics.asterix.aql.parser.AQLParser;
+import edu.uci.ics.asterix.aql.parser.ParseException;
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
+import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+
+/**
+ * Represents the AQL statement for subscribing to a feed.
+ * This AQL statement is private and may not be used by the end-user.
+ */
+public class SubscribeFeedStatement implements Statement {
+
+    private static final Logger LOGGER = Logger.getLogger(SubscribeFeedStatement.class.getName());
+    private final FeedConnectionRequest connectionRequest;
+    private Query query;
+    private int varCounter;
+    private final String[] locations;
+
+    public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
+
+    public SubscribeFeedStatement(String[] locations, FeedConnectionRequest subscriptionRequest) {
+        this.connectionRequest = subscriptionRequest;
+        this.varCounter = 0;
+        this.locations = locations;
+    }
+
+    public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException {
+        this.query = new Query();
+        FeedId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId();
+        Feed subscriberFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId()
+                .getDataverse(), connectionRequest.getReceivingFeedId().getFeedName());
+        if (subscriberFeed == null) {
+            throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not found.");
+        }
+
+        String feedOutputType = getOutputType(mdTxnCtx);
+        FunctionSignature appliedFunction = subscriberFeed.getAppliedFunction();
+        Function function = null;
+        if (appliedFunction != null) {
+            function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
+            if (function == null) {
+                throw new MetadataException(" Unknown function " + function);
+            } else if (function.getParams().size() > 1) {
+                throw new MetadataException(" Incompatible function: " + appliedFunction
+                        + " Number if arguments must be 1");
+            }
+        }
+
+        StringBuilder builder = new StringBuilder();
+        builder.append("use dataverse " + sourceFeedId.getDataverse() + ";\n");
+        builder.append("set" + " " + FunctionUtils.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
+        builder.append("set" + " " + FeedActivity.FeedActivityDetails.FEED_POLICY_NAME + " " + "'"
+                + connectionRequest.getPolicy() + "'" + ";\n");
+
+        builder.append("insert into dataset " + connectionRequest.getTargetDataset() + " ");
+        builder.append(" (" + " for $x in feed-collect ('" + sourceFeedId.getDataverse() + "'" + "," + "'"
+                + sourceFeedId.getFeedName() + "'" + "," + "'" + connectionRequest.getReceivingFeedId().getFeedName()
+                + "'" + "," + "'" + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'"
+                + connectionRequest.getTargetDataset() + "'" + "," + "'" + feedOutputType + "'" + ")");
+
+        List<String> functionsToApply = connectionRequest.getFunctionsToApply();
+        if (functionsToApply != null && functionsToApply.isEmpty()) {
+            builder.append(" return $x");
+        } else {
+            String rValueName = "x";
+            String lValueName = "y";
+            int variableIndex = 0;
+            for (String functionName : functionsToApply) {
+                function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
+                variableIndex++;
+                switch (function.getLanguage().toUpperCase()) {
+                    case Function.LANGUAGE_AQL:
+                        builder.append(" let " + "$" + lValueName + variableIndex + ":=(" + function.getFunctionBody()
+                                + ")");
+                        builder.append("\n");
+                        break;
+                    case Function.LANGUAGE_JAVA:
+                        builder.append(" let " + "$" + lValueName + variableIndex + ":=" + functionName + "(" + "$"
+                                + rValueName + ")");
+                        rValueName = lValueName + variableIndex;
+                        break;
+                }
+                builder.append("\n");
+            }
+            builder.append("return $" + lValueName + variableIndex);
+        }
+        builder.append(")");
+        builder.append(";");
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Connect feed statement translated to\n" + builder.toString());
+        }
+        AQLParser parser = new AQLParser(new StringReader(builder.toString()));
+
+        List<Statement> statements;
+        try {
+            statements = parser.Statement();
+            query = ((InsertStatement) statements.get(3)).getQuery();
+        } catch (ParseException pe) {
+            throw new MetadataException(pe);
+        }
+
+    }
+
+    public Query getQuery() {
+        return query;
+    }
+
+    public int getVarCounter() {
+        return varCounter;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.SUBSCRIBE_FEED;
+    }
+
+    public String getPolicy() {
+        return connectionRequest.getPolicy();
+    }
+
+    public FeedConnectionRequest getSubscriptionRequest() {
+        return connectionRequest;
+    }
+
+    @Override
+    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
+    }
+
+    public String getDataverseName() {
+        return connectionRequest.getReceivingFeedId().getDataverse();
+    }
+
+    private String getOutputType(MetadataTransactionContext mdTxnCtx) throws MetadataException {
+        String outputType = null;
+        FeedId feedId = connectionRequest.getReceivingFeedId();
+        Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getFeedName());
+        FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(connectionRequest.getPolicyParameters());
+        try {
+            switch (feed.getFeedType()) {
+                case PRIMARY:
+                    Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+
+                    factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) feed, policyAccessor,
+                            mdTxnCtx);
+                    outputType = factoryOutput.second.getTypeName();
+                    break;
+                case SECONDARY:
+                    outputType = FeedUtil.getSecondaryFeedOutput((SecondaryFeed) feed, policyAccessor, mdTxnCtx);
+                    break;
+            }
+            return outputType;
+
+        } catch (AlgebricksException ae) {
+            throw new MetadataException(ae);
+        }
+    }
+
+    public String[] getLocations() {
+        return locations;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
index 21a2388..28acfda 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
@@ -25,6 +25,7 @@ import edu.uci.ics.asterix.aql.expression.CallExpr;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
 import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
@@ -37,6 +38,7 @@ import edu.uci.ics.asterix.aql.expression.DistinctClause;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.FieldBinding;
 import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -61,13 +63,13 @@ import edu.uci.ics.asterix.aql.expression.OperatorType;
 import edu.uci.ics.asterix.aql.expression.OrderbyClause;
 import edu.uci.ics.asterix.aql.expression.OrderbyClause.OrderModifier;
 import edu.uci.ics.asterix.aql.expression.OrderedListTypeDefinition;
-import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.QuantifiedExpression;
 import edu.uci.ics.asterix.aql.expression.QuantifiedPair;
 import edu.uci.ics.asterix.aql.expression.Query;
 import edu.uci.ics.asterix.aql.expression.RecordConstructor;
 import edu.uci.ics.asterix.aql.expression.RecordTypeDefinition;
 import edu.uci.ics.asterix.aql.expression.RecordTypeDefinition.RecordKind;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.SetStatement;
 import edu.uci.ics.asterix.aql.expression.TypeDecl;
 import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
@@ -557,6 +559,15 @@ public class AQLPrintVisitor implements IAqlVisitorWithVoidReturn<Integer> {
         // TODO Auto-generated method stub
 
     }
+    
+    @Override
+    public void visit(CreateFeedPolicyStatement stmt, Integer arg) throws AsterixException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void visit(FeedPolicyDropStatement stmt, Integer arg) throws AsterixException {
+    }
 
     @Override
     public void visit(RunStatement stmt, Integer arg) throws AsterixException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java
index 84d9726..6716fff 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java
@@ -15,21 +15,24 @@
 package edu.uci.ics.asterix.aql.expression.visitor;
 
 import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DatasetDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
 import edu.uci.ics.asterix.aql.expression.DeleteStatement;
 import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DistinctClause;
-import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
+import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.ForClause;
 import edu.uci.ics.asterix.aql.expression.FunctionDecl;
@@ -157,9 +160,15 @@ public interface IAqlExpressionVisitor<R, T> {
 
     R visitConnectFeedStatement(ConnectFeedStatement del, T arg) throws AsterixException;
 
-    R visitCreateFeedStatement(CreateFeedStatement del, T arg) throws AsterixException;
+    R visitCreatePrimaryFeedStatement(CreatePrimaryFeedStatement cpfs, T arg) throws AsterixException;
+
+    R visitCreateSecondaryFeedStatement(CreateSecondaryFeedStatement csfs, T arg) throws AsterixException;
 
     R visitDropFeedStatement(FeedDropStatement del, T arg) throws AsterixException;
+    
+    R visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs, T arg) throws AsterixException;
+
+    R visitCreateFeedPolicyStatement(CreateFeedPolicyStatement cfps, T arg) throws AsterixException;
 
     R visitCallExpr(CallExpr pf, T arg) throws AsterixException;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
index f04c3a0..84e0825 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
@@ -18,6 +18,7 @@ import edu.uci.ics.asterix.aql.expression.CallExpr;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
 import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
@@ -30,6 +31,7 @@ import edu.uci.ics.asterix.aql.expression.DistinctClause;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.ForClause;
 import edu.uci.ics.asterix.aql.expression.FunctionDecl;
@@ -49,11 +51,11 @@ import edu.uci.ics.asterix.aql.expression.NodegroupDecl;
 import edu.uci.ics.asterix.aql.expression.OperatorExpr;
 import edu.uci.ics.asterix.aql.expression.OrderbyClause;
 import edu.uci.ics.asterix.aql.expression.OrderedListTypeDefinition;
-import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.QuantifiedExpression;
 import edu.uci.ics.asterix.aql.expression.Query;
 import edu.uci.ics.asterix.aql.expression.RecordConstructor;
 import edu.uci.ics.asterix.aql.expression.RecordTypeDefinition;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.SetStatement;
 import edu.uci.ics.asterix.aql.expression.TypeDecl;
 import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
@@ -164,8 +166,12 @@ public interface IAqlVisitorWithVoidReturn<T> {
 
     void visit(CreateFeedStatement stmt, T arg) throws AsterixException;
 
+    void visit(CreateFeedPolicyStatement stmt, T arg) throws AsterixException;
+
     void visit(FeedDropStatement stmt, T arg) throws AsterixException;
 
+    void visit(FeedPolicyDropStatement stmt, T arg) throws AsterixException;
+
     void visit(CreateFunctionStatement cfs, T arg) throws AsterixException;
 
     void visit(FunctionDropStatement fds, T arg) throws AsterixException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java
index c416fa6..6b1df98 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java
@@ -25,21 +25,24 @@ import edu.uci.ics.asterix.aql.base.Clause;
 import edu.uci.ics.asterix.aql.base.Expression;
 import edu.uci.ics.asterix.aql.base.Expression.Kind;
 import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DatasetDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
 import edu.uci.ics.asterix.aql.expression.DeleteStatement;
+import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DistinctClause;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.FieldBinding;
 import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -89,8 +92,6 @@ import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.AsterixFunction;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 
 public final class AqlRewriter {
 
@@ -553,8 +554,13 @@ public final class AqlRewriter {
         }
 
         @Override
-        public Void visitCreateFeedStatement(CreateFeedStatement del, Void arg) throws AsterixException {
-            // TODO Auto-generated method stub
+        public Void visitCreatePrimaryFeedStatement(CreatePrimaryFeedStatement del, Void arg) throws AsterixException {
+            return null;
+        }
+
+        @Override
+        public Void visitCreateSecondaryFeedStatement(CreateSecondaryFeedStatement del, Void arg)
+                throws AsterixException {
             return null;
         }
 
@@ -575,6 +581,18 @@ public final class AqlRewriter {
             // TODO Auto-generated method stub
             return null;
         }
+        
+        @Override
+        public Void visitCreateFeedPolicyStatement(CreateFeedPolicyStatement cfps, Void arg) throws AsterixException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Void visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs, Void arg) throws AsterixException {
+            // TODO Auto-generated method stub
+            return null;
+        }
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java
index edb080c..f03799a 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java
@@ -22,21 +22,24 @@ import edu.uci.ics.asterix.aql.base.Clause;
 import edu.uci.ics.asterix.aql.base.Expression;
 import edu.uci.ics.asterix.aql.base.IAqlExpression;
 import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DatasetDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
 import edu.uci.ics.asterix.aql.expression.DeleteStatement;
+import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DistinctClause;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.FieldBinding;
 import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -588,8 +591,15 @@ public class CloneAndSubstituteVariablesVisitor implements
     }
 
     @Override
-    public Pair<IAqlExpression, List<VariableSubstitution>> visitCreateFeedStatement(CreateFeedStatement del,
-            List<VariableSubstitution> arg) throws AsterixException {
+    public Pair<IAqlExpression, List<VariableSubstitution>> visitCreatePrimaryFeedStatement(
+            CreatePrimaryFeedStatement del, List<VariableSubstitution> arg) throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Pair<IAqlExpression, List<VariableSubstitution>> visitCreateSecondaryFeedStatement(
+            CreateSecondaryFeedStatement del, List<VariableSubstitution> arg) throws AsterixException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -614,4 +624,17 @@ public class CloneAndSubstituteVariablesVisitor implements
         // TODO Auto-generated method stub
         return null;
     }
+    
+
+    @Override
+    public Pair<IAqlExpression, List<VariableSubstitution>> visitCreateFeedPolicyStatement(
+            CreateFeedPolicyStatement cfps, List<VariableSubstitution> arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public Pair<IAqlExpression, List<VariableSubstitution>> visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs,
+            List<VariableSubstitution> arg) throws AsterixException {
+        return null;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java
index 58433d3..394b507 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java
@@ -23,21 +23,24 @@ import edu.uci.ics.asterix.aql.base.Expression;
 import edu.uci.ics.asterix.aql.base.Expression.Kind;
 import edu.uci.ics.asterix.aql.base.IAqlExpression;
 import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DatasetDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
 import edu.uci.ics.asterix.aql.expression.DeleteStatement;
+import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DistinctClause;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.FieldBinding;
 import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -522,7 +525,15 @@ public class InlineUdfsVisitor implements IAqlExpressionVisitor<Boolean, List<Fu
     }
 
     @Override
-    public Boolean visitCreateFeedStatement(CreateFeedStatement del, List<FunctionDecl> arg) throws AsterixException {
+    public Boolean visitCreatePrimaryFeedStatement(CreatePrimaryFeedStatement del, List<FunctionDecl> arg)
+            throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Boolean visitCreateSecondaryFeedStatement(CreateSecondaryFeedStatement del, List<FunctionDecl> arg)
+            throws AsterixException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -544,4 +555,18 @@ public class InlineUdfsVisitor implements IAqlExpressionVisitor<Boolean, List<Fu
         // TODO Auto-generated method stub
         return null;
     }
+    
+    @Override
+    public Boolean visitCreateFeedPolicyStatement(CreateFeedPolicyStatement cfps, List<FunctionDecl> arg)
+            throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Boolean visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs, List<FunctionDecl> arg)
+            throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-aql/src/main/javacc/AQL.jj
----------------------------------------------------------------------
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 8150871..7044962 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -37,6 +37,9 @@ import edu.uci.ics.asterix.aql.expression.AbstractAccessor;
 import edu.uci.ics.asterix.aql.expression.CallExpr;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
 import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
@@ -336,6 +339,7 @@ Statement CreateStatement() throws ParseException:
     | stmt = DataverseSpecification()
     | stmt = FunctionSpecification()
     | stmt = FeedSpecification()
+    | stmt = FeedPolicySpecification()
   )
   {
     return stmt;
@@ -653,24 +657,61 @@ CreateFeedStatement FeedSpecification() throws ParseException:
   Map<String,String> properties = null;
   FunctionSignature appliedFunction = null;
   CreateFeedStatement cfs = null;
+  Pair<Identifier,Identifier> sourceNameComponents = null;
+  
 }
 {
   (
-    "feed"  nameComponents = QualifiedName()
-    ifNotExists = IfNotExists()
-    "using" adapterName = AdapterName() properties = Configuration()
-    (appliedFunction = ApplyFunction())?
+    "secondary" "feed"  nameComponents = QualifiedName() ifNotExists = IfNotExists()
+      <FROM> "feed" sourceNameComponents = QualifiedName() (appliedFunction = ApplyFunction())?
       {
-        cfs = new CreateFeedStatement(nameComponents.first,
-                                   nameComponents.second, adapterName, properties, appliedFunction, ifNotExists);
+        cfs = new CreateSecondaryFeedStatement(nameComponents,
+                                   sourceNameComponents, appliedFunction, ifNotExists);
       }
-
+     |
+     ("primary")? "feed" nameComponents = QualifiedName() ifNotExists = IfNotExists()
+      "using" adapterName = AdapterName() properties = Configuration() (appliedFunction = ApplyFunction())?  
+       {
+        cfs = new CreatePrimaryFeedStatement(nameComponents,
+                                    adapterName, properties, appliedFunction, ifNotExists);
+       }
   )
     {
       return cfs;
     }
 }
 
+CreateFeedPolicyStatement FeedPolicySpecification() throws ParseException:
+{
+  String policyName = null;  
+  String basePolicyName = null; 
+  String sourcePolicyFile = null;
+  String definition = null;
+  boolean ifNotExists = false;
+  Map<String,String> properties = null;
+  CreateFeedPolicyStatement cfps = null;
+}
+{
+  (
+    "ingestion" "policy"  policyName = Identifier() ifNotExists = IfNotExists()
+      <FROM> 
+      ("policy" basePolicyName = Identifier() properties = Configuration() ("definition" definition = StringLiteral())?  
+      {
+        cfps = new CreateFeedPolicyStatement(policyName,
+                                   basePolicyName, properties, definition, ifNotExists);
+      }
+     | "path" sourcePolicyFile = Identifier() ("definition" definition = StringLiteral())?  
+       {
+        cfps = new CreateFeedPolicyStatement(policyName, sourcePolicyFile, definition, ifNotExists);
+       }
+     ) 
+       
+  )
+    {
+      return cfps;
+    }
+}
+
 
 
 List<VarIdentifier> ParameterList() throws ParseException:

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
index 8059ede..73c61f0 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -20,7 +20,7 @@ import java.util.concurrent.Executor;
 
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.IIOManager;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterEventsSubscriber.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterEventsSubscriber.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterEventsSubscriber.java
new file mode 100644
index 0000000..23b5fd2
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterEventsSubscriber.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.asterix.common.api;
+
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Set;
+
+import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState;
+
+public interface IClusterEventsSubscriber {
+
+    /**
+     * @param deadNodeIds
+     * @return
+     */
+    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds);
+
+    /**
+     * @param joinedNodeId
+     * @return
+     */
+    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId);
+
+    /**
+     * @param response
+     */
+    public void notifyRequestCompletion(IClusterManagementWorkResponse response);
+
+    /**
+     * @param previousState
+     * @param newState
+     */
+    public void notifyStateChange(ClusterState previousState, ClusterState newState);
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterManagementWork.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterManagementWork.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterManagementWork.java
new file mode 100644
index 0000000..dc7a69c
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterManagementWork.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.api;
+
+public interface IClusterManagementWork {
+
+    public enum WorkType {
+        ADD_NODE,
+        REMOVE_NODE
+    }
+
+    public enum ClusterState {
+        ACTIVE,
+        UNUSABLE
+    }
+
+    public WorkType getClusterManagementWorkType();
+
+    public int getWorkId();
+
+    public IClusterEventsSubscriber getSourceSubscriber();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterManagementWorkResponse.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterManagementWorkResponse.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterManagementWorkResponse.java
new file mode 100644
index 0000000..25fcda0
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IClusterManagementWorkResponse.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.asterix.common.api;
+
+
+public interface IClusterManagementWorkResponse {
+
+    public enum Status {
+        IN_PROGRESS,
+        SUCCESS,
+        FAILURE
+    }
+
+    /**
+     * @return
+     */
+    public IClusterManagementWork getWork();
+
+    /**
+     * @return
+     */
+    public Status getStatus();
+
+    /**
+     * @param status
+     */
+    public void setStatus(Status status);
+
+}