You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:20 UTC

[23/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
deleted file mode 100644
index 5cb53b2..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.api.http.servlet;
-
-import java.awt.image.BufferedImage;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.imageio.ImageIO;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
-
-public class FeedDashboardServlet extends HttpServlet {
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOGGER = Logger.getLogger(FeedDashboardServlet.class.getName());
-
-    @Override
-    public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
-        String resourcePath = null;
-        String requestURI = request.getRequestURI();
-
-        if (requestURI.equals("/")) {
-            response.setContentType("text/html");
-            resourcePath = "/feed/dashboard.html";
-        } else {
-            resourcePath = requestURI + ".html";
-        }
-
-        try {
-            InputStream is = FeedDashboardServlet.class.getResourceAsStream(resourcePath);
-            if (is == null) {
-                response.sendError(HttpServletResponse.SC_NOT_FOUND);
-                return;
-            }
-
-            // Special handler for font files and .png resources
-            if (resourcePath.endsWith(".png")) {
-
-                BufferedImage img = ImageIO.read(is);
-                OutputStream outputStream = response.getOutputStream();
-                String formatName = "png";
-                response.setContentType("image/png");
-                ImageIO.write(img, formatName, outputStream);
-                outputStream.close();
-                return;
-
-            }
-
-            response.setCharacterEncoding("utf-8");
-            InputStreamReader isr = new InputStreamReader(is);
-            StringBuilder sb = new StringBuilder();
-            BufferedReader br = new BufferedReader(isr);
-            String line = br.readLine();
-
-            while (line != null) {
-                sb.append(line + "\n");
-                line = br.readLine();
-            }
-
-            String feedName = request.getParameter("feed");
-            String datasetName = request.getParameter("dataset");
-            String dataverseName = request.getParameter("dataverse");
-
-            FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
-
-            String outStr = null;
-            if (requestURI.startsWith("/webui/static")) {
-                outStr = sb.toString();
-            } else {
-                MetadataManager.INSTANCE.init();
-                MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
-                FeedActivity activity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(ctx, feedId,
-                        FeedActivityType.FEED_BEGIN);
-                MetadataManager.INSTANCE.commitTransaction(ctx);
-
-                Map<String, String> activityDetails = activity.getFeedActivityDetails();
-
-                String host = activityDetails.get(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_HOST);
-                int port = Integer.parseInt(activityDetails
-                        .get(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_PORT));
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info(" Super Feed Maanger address :" + host + "[" + port + "]");
-                }
-
-                String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
-                String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
-                String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
-                String ingestionPolicy = activityDetails.get(FeedActivityDetails.FEED_POLICY_NAME);
-                String activeSince = activity.getLastUpdatedTimestamp();
-
-                outStr = String.format(sb.toString(), dataverseName, datasetName, feedName, ingestLocations,
-                        computeLocations, storageLocations, ingestionPolicy, activeSince);
-                FeedServletUtil.initiateSubscription(feedId, host, port);
-            }
-
-            PrintWriter out = response.getWriter();
-            out.println(outStr);
-        } catch (ACIDException | MetadataException e) {
-            e.printStackTrace();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
deleted file mode 100644
index 463ce01..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.api.http.servlet;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
-
-public class FeedDataProviderServlet extends HttpServlet {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
-
-        String feedName = request.getParameter("feed");
-        String datasetName = request.getParameter("dataset");
-        String dataverseName = request.getParameter("dataverse");
-
-        String report = getFeedReport(feedName, datasetName, dataverseName);
-        System.out.println(" REPORT " + report);
-        long timestamp = System.currentTimeMillis();
-        JSONObject obj = null;
-        if (report != null) {
-            JSONArray array = new JSONArray();
-            try {
-                obj = new JSONObject();
-                obj.put("type", "report");
-                obj.put("time", timestamp);
-                obj.put("value", report);
-            } catch (JSONException jsoe) {
-                throw new IOException(jsoe);
-            }
-        } else {
-            obj = verifyIfFeedIsAlive(dataverseName, feedName, datasetName);
-        }
-
-        PrintWriter out = response.getWriter();
-        out.println(obj.toString());
-    }
-
-    private String getFeedReport(String feedName, String datasetName, String dataverseName) {
-        FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
-        LinkedBlockingQueue<String> queue = FeedLifecycleListener.INSTANCE.getFeedReportQueue(feedId);
-        String report = null;
-        try {
-            report = queue.poll(25, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        return report;
-    }
-
-    private JSONObject verifyIfFeedIsAlive(String dataverseName, String feedName, String datasetName) {
-        JSONObject obj = new JSONObject();
-        try {
-            MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
-            List<FeedActivity> feedActivities = MetadataManager.INSTANCE
-                    .getActiveFeeds(ctx, dataverseName, datasetName);
-            FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
-            FeedActivity activity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(ctx, feedId, null);
-            switch (activity.getActivityType()) {
-                case FEED_BEGIN:
-                    Map<String, String> activityDetails = activity.getFeedActivityDetails();
-                    String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
-                    String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
-                    String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
-                    obj.put("status", "active");
-                    obj.put("type", "reload");
-                    obj.put("ingestLocations", ingestLocations);
-                    obj.put("computeLocations", computeLocations);
-                    obj.put("storageLocations", storageLocations);
-                    System.out.println(" RE LOADING " + " ingestion at " + ingestLocations + " compute at "
-                            + computeLocations + " storage at " + storageLocations);
-                    break;
-                case FEED_FAILURE:
-                    obj.put("status", "failed");
-                    break;
-                case FEED_END:
-                    obj.put("status", "ended");
-                    break;
-            }
-        } catch (Exception e) {
-            // ignore
-        }
-        return obj;
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
index 96b3c31..adca4c6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
@@ -21,19 +21,20 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintWriter;
-import java.util.List;
+import java.util.Collection;
 
 import javax.imageio.ImageIO;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedActivity.FeedActivityDetails;
 import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLoadManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
 
 public class FeedServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
@@ -50,73 +51,116 @@ public class FeedServlet extends HttpServlet {
             resourcePath = requestURI;
         }
 
-        try {
-            InputStream is = FeedServlet.class.getResourceAsStream(resourcePath);
-            if (is == null) {
-                response.sendError(HttpServletResponse.SC_NOT_FOUND);
-                return;
-            }
+        InputStream is = FeedServlet.class.getResourceAsStream(resourcePath);
+        if (is == null) {
+            response.sendError(HttpServletResponse.SC_NOT_FOUND);
+            return;
+        }
 
-            // Special handler for font files and .png resources
-            if (resourcePath.endsWith(".png")) {
+        // Special handler for font files and .png resources
+        if (resourcePath.endsWith(".png")) {
 
-                BufferedImage img = ImageIO.read(is);
-                OutputStream outputStream = response.getOutputStream();
-                String formatName = "png";
-                response.setContentType("image/png");
-                ImageIO.write(img, formatName, outputStream);
-                outputStream.close();
-                return;
+            BufferedImage img = ImageIO.read(is);
+            OutputStream outputStream = response.getOutputStream();
+            String formatName = "png";
+            response.setContentType("image/png");
+            ImageIO.write(img, formatName, outputStream);
+            outputStream.close();
+            return;
 
-            }
+        }
 
-            response.setCharacterEncoding("utf-8");
-            InputStreamReader isr = new InputStreamReader(is);
-            StringBuilder sb = new StringBuilder();
-            BufferedReader br = new BufferedReader(isr);
-            String line = br.readLine();
+        response.setCharacterEncoding("utf-8");
+        InputStreamReader isr = new InputStreamReader(is);
+        StringBuilder sb = new StringBuilder();
+        BufferedReader br = new BufferedReader(isr);
+        String line = br.readLine();
 
-            while (line != null) {
-                sb.append(line + "\n");
-                line = br.readLine();
-            }
+        while (line != null) {
+            sb.append(line + "\n");
+            line = br.readLine();
+        }
 
-            String outStr = null;
-            if (requestURI.startsWith("/webui/static")) {
-                outStr = sb.toString();
+        String outStr = null;
+        if (requestURI.startsWith("/webui/static")) {
+            outStr = sb.toString();
+        } else {
+            Collection<FeedActivity> lfa = CentralFeedManager.getInstance().getFeedLoadManager().getFeedActivities();
+            StringBuilder ldStr = new StringBuilder();
+            ldStr.append("<br />");
+            ldStr.append("<br />");
+            if (lfa == null || lfa.isEmpty()) {
+                ldStr.append("Currently there are no active feeds in AsterixDB");
             } else {
-                MetadataManager.INSTANCE.init();
-                MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
-                List<FeedActivity> lfa = MetadataManager.INSTANCE.getActiveFeeds(ctx, null, null);
-                StringBuilder ldStr = new StringBuilder();
-                ldStr.append("<br />");
-                ldStr.append("<br />");
-                if (lfa == null || lfa.isEmpty()) {
-                    ldStr.append("Currently there are no active feeds in the Asterix");
-                } else {
-                    ldStr.append("Active Feeds");
-                }
-                FeedConnectionId feedId = null;
-                for (FeedActivity feedActivity : lfa) {
-                    feedId = new FeedConnectionId(feedActivity.getDataverseName(), feedActivity.getFeedName(),
-                            feedActivity.getDatasetName());
-                    ldStr.append("<br />");
-                    ldStr.append("<br />");
-                    ldStr.append("<a href=\"/feed/dashboard?dataverse=" + feedActivity.getDataverseName() + "&feed="
-                            + feedActivity.getFeedName() + "&dataset=" + feedActivity.getDatasetName() + "\">" + feedId
-                            + "</a>");
-                    ldStr.append("<br />");
-                }
-
-                outStr = String.format(sb.toString(), ldStr.toString());
-                MetadataManager.INSTANCE.commitTransaction(ctx);
-
+                ldStr.append("Active Feeds");
             }
+            insertTable(ldStr, lfa);
+            outStr = String.format(sb.toString(), ldStr.toString());
+
+        }
+
+        PrintWriter out = response.getWriter();
+        out.println(outStr);
+    }
 
-            PrintWriter out = response.getWriter();
-            out.println(outStr);
-        } catch (ACIDException | MetadataException e) {
+    private void insertTable(StringBuilder html, Collection<FeedActivity> list) {
+        html.append("<table style=\"width:100%\">");
+        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_FEED_NAME + "</th>");
+        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_DATASET_NAME + "</th>");
+        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_ACTIVE_SINCE + "</th>");
+        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_INTAKE_STAGE + "</th>");
+        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_COMPUTE_STAGE + "</th>");
+        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_STORE_STAGE + "</th>");
+        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_INTAKE_RATE + "</th>");
+        html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_STORE_RATE + "</th>");
+        for (FeedActivity activity : list) {
+            insertRow(html, activity);
+        }
+    }
 
+    private void insertRow(StringBuilder html, FeedActivity activity) {
+        String intake = activity.getFeedActivityDetails().get(FeedActivityDetails.INTAKE_LOCATIONS);
+        String compute = activity.getFeedActivityDetails().get(FeedActivityDetails.COMPUTE_LOCATIONS);
+        String store = activity.getFeedActivityDetails().get(FeedActivityDetails.STORAGE_LOCATIONS);
+
+        IFeedLoadManager loadManager = CentralFeedManager.getInstance().getFeedLoadManager();
+        FeedConnectionId connectionId = new FeedConnectionId(new FeedId(activity.getDataverseName(),
+                activity.getFeedName()), activity.getDatasetName());
+        int intakeRate = loadManager.getOutflowRate(connectionId, FeedRuntimeType.COLLECT) * intake.split(",").length;
+        int storeRate = loadManager.getOutflowRate(connectionId, FeedRuntimeType.STORE) * store.split(",").length;
+
+        html.append("<tr>");
+        html.append("<td>" + activity.getFeedName() + "</td>");
+        html.append("<td>" + activity.getDatasetName() + "</td>");
+        html.append("<td>" + activity.getConnectTimestamp() + "</td>");
+        //html.append("<td>" + insertLink(html, FeedDashboardServlet.getParameterizedURL(activity), "Details") + "</td>");
+        html.append("<td>" + intake + "</td>");
+        html.append("<td>" + compute + "</td>");
+        html.append("<td>" + store + "</td>");
+        String color = "black";
+        if (intakeRate > storeRate) {
+            color = "red";
+        }
+        if (intakeRate < 0) {
+            html.append("<td>" + "UNKNOWN" + "</td>");
+        } else {
+            html.append("<td>" + insertColoredText("" + intakeRate, color) + " rec/sec" + "</td>");
+        }
+        if (storeRate < 0) {
+            html.append("<td>" + "UNKNOWN" + "</td>");
+        } else {
+            html.append("<td>" + insertColoredText("" + storeRate, color) + " rec/sec" + "</td>");
         }
+        html.append("</tr>");
+    }
+
+    private String insertLink(StringBuilder html, String url, String displayText) {
+        return ("<a href=\"" + url + "\">" + displayText + "</a>");
+    }
+
+    private String insertColoredText(String s, String color) {
+        return "<font color=\"" + color + "\">" + s + "</font>";
     }
 }
+
+

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServletUtil.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServletUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServletUtil.java
index ff29a23..8567810 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServletUtil.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServletUtil.java
@@ -9,7 +9,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
 import edu.uci.ics.asterix.metadata.feeds.RemoteSocketMessageListener;
 
 public class FeedServletUtil {
@@ -17,6 +17,17 @@ public class FeedServletUtil {
     private static final Logger LOGGER = Logger.getLogger(FeedServletUtil.class.getName());
     private static final char EOL = (char) "\n".getBytes()[0];
 
+    public static final class Constants {
+        public static final String TABLE_HEADER_FEED_NAME = "Feed";
+        public static final String TABLE_HEADER_DATASET_NAME = "Dataset";
+        public static final String TABLE_HEADER_ACTIVE_SINCE = "Timestamp";
+        public static final String TABLE_HEADER_INTAKE_STAGE = "Intake Stage";
+        public static final String TABLE_HEADER_COMPUTE_STAGE = "Compute Stage";
+        public static final String TABLE_HEADER_STORE_STAGE = "Store Stage";
+        public static final String TABLE_HEADER_INTAKE_RATE = "Intake";
+        public static final String TABLE_HEADER_STORE_RATE = "Store";
+    }
+
     public static void initiateSubscription(FeedConnectionId feedId, String host, int port) throws IOException {
         LinkedBlockingQueue<String> outbox = new LinkedBlockingQueue<String>();
         int subscriptionPort = port + 1;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
index 3f104fe..bf7ec75 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
@@ -25,9 +25,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.json.JSONArray;
 import org.json.JSONObject;
 
-import edu.uci.ics.asterix.api.common.APIFramework;
 import edu.uci.ics.asterix.api.common.SessionConfig;
-import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
 import edu.uci.ics.asterix.result.ResultReader;
 import edu.uci.ics.asterix.result.ResultUtils;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
index 4e8427d..dd19c97 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
@@ -15,7 +15,6 @@
 package edu.uci.ics.asterix.api.http.servlet;
 
 import java.io.IOException;
-import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -30,7 +29,6 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.commons.io.IOUtils;
 import org.json.JSONObject;
 
-import edu.uci.ics.asterix.api.common.APIFramework;
 import edu.uci.ics.asterix.api.common.SessionConfig;
 import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
 import edu.uci.ics.asterix.aql.base.Statement;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index e5ddf2b..dad5975 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -16,6 +16,8 @@ package edu.uci.ics.asterix.aql.translator;
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.rmi.RemoteException;
@@ -26,6 +28,8 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.logging.Level;
@@ -44,9 +48,12 @@ import edu.uci.ics.asterix.aql.base.Statement;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
 import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DatasetDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
@@ -55,6 +62,7 @@ import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.ExternalDetailsDecl;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FunctionDecl;
 import edu.uci.ics.asterix.aql.expression.FunctionDropStatement;
 import edu.uci.ics.asterix.aql.expression.IDatasetDetailsDecl;
@@ -69,6 +77,7 @@ import edu.uci.ics.asterix.aql.expression.Query;
 import edu.uci.ics.asterix.aql.expression.RefreshExternalDatasetStatement;
 import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.SetStatement;
+import edu.uci.ics.asterix.aql.expression.SubscribeFeedStatement;
 import edu.uci.ics.asterix.aql.expression.TypeDecl;
 import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
 import edu.uci.ics.asterix.aql.expression.TypeExpression;
@@ -82,8 +91,21 @@ import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity.FeedActivityDetails;
 import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedJointKey;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint.FeedJointType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
+import edu.uci.ics.asterix.feeds.FeedJoint;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
 import edu.uci.ics.asterix.file.DatasetOperations;
 import edu.uci.ics.asterix.file.DataverseOperations;
 import edu.uci.ics.asterix.file.ExternalIndexingOperations;
@@ -106,14 +128,17 @@ import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.Feed.FeedType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
-import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
+import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleEventSubscriber;
 import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
 import edu.uci.ics.asterix.metadata.utils.MetadataLockManager;
@@ -138,6 +163,7 @@ import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexCompactSta
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
 import edu.uci.ics.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import edu.uci.ics.asterix.translator.TypeTranslator;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -229,7 +255,8 @@ public class AqlTranslator extends AbstractAqlTranslator {
 
         for (Statement stmt : aqlStatements) {
             validateOperation(activeDefaultDataverse, stmt);
-            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
+            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse,
+                    CentralFeedManager.getInstance());
             metadataProvider.setWriterFactory(writerFactory);
             metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
             metadataProvider.setOutputFile(outputFile);
@@ -307,7 +334,8 @@ public class AqlTranslator extends AbstractAqlTranslator {
                     break;
                 }
 
-                case CREATE_FEED: {
+                case CREATE_PRIMARY_FEED:
+                case CREATE_SECONDARY_FEED: {
                     handleCreateFeedStatement(metadataProvider, stmt, hcc);
                     break;
                 }
@@ -316,6 +344,12 @@ public class AqlTranslator extends AbstractAqlTranslator {
                     handleDropFeedStatement(metadataProvider, stmt, hcc);
                     break;
                 }
+
+                case DROP_FEED_POLICY: {
+                    handleDropFeedPolicyStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
                 case CONNECT_FEED: {
                     handleConnectFeedStatement(metadataProvider, stmt, hcc);
                     break;
@@ -326,6 +360,16 @@ public class AqlTranslator extends AbstractAqlTranslator {
                     break;
                 }
 
+                case SUBSCRIBE_FEED: {
+                    handleSubscribeFeedStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
+                case CREATE_FEED_POLICY: {
+                    handleCreateFeedPolicyStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
+
                 case QUERY: {
                     metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
                     metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
@@ -468,7 +512,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
 
         ProgressState progress = ProgressState.NO_PROGRESS;
         DatasetDecl dd = (DatasetDecl) stmt;
-        String dataverseName = getActiveDataverseName(dd.getDataverse());
+        String dataverseName = getActiveDataverse(dd.getDataverse());
         String datasetName = dd.getName().getValue();
         DatasetType dsType = dd.getDatasetType();
         String itemTypeName = dd.getItemTypeName().getValue();
@@ -647,6 +691,27 @@ public class AqlTranslator extends AbstractAqlTranslator {
         }
     }
 
+    private void validateIfResourceIsActiveInFeed(String dataverseName, String datasetName) throws AsterixException {
+        List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
+        boolean resourceInUse = false;
+        StringBuilder builder = new StringBuilder();
+
+        if (activeFeedConnections != null && !activeFeedConnections.isEmpty()) {
+            for (FeedConnectionId connId : activeFeedConnections) {
+                if (connId.getDatasetName().equals(datasetName)) {
+                    resourceInUse = true;
+                    builder.append(connId + "\n");
+                }
+            }
+        }
+
+        if (resourceInUse) {
+            throw new AsterixException("Dataset " + datasetName + " is currently being "
+                    + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported");
+        }
+
+    }
+
     private String getNodeGroupName(Identifier ngNameId, DatasetDecl dd, String dataverse) {
         if (ngNameId != null) {
             return ngNameId.getValue();
@@ -716,7 +781,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
             IHyracksClientConnection hcc) throws Exception {
         ProgressState progress = ProgressState.NO_PROGRESS;
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-        String dataverseName = getActiveDataverseName(stmtCreateIndex.getDataverseName());
+        String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName());
         String datasetName = stmtCreateIndex.getDatasetName().getValue();
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -826,19 +891,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                List<FeedActivity> feedActivities = MetadataManager.INSTANCE.getActiveFeeds(mdTxnCtx, dataverseName,
-                        datasetName);
-                if (feedActivities != null && !feedActivities.isEmpty()) {
-                    StringBuilder builder = new StringBuilder();
-
-                    for (FeedActivity fa : feedActivities) {
-                        builder.append(fa + "\n");
-                    }
-                    throw new AsterixException("Dataset" + datasetName
-                            + " is currently being fed into by the following feeds " + "." + builder.toString()
-                            + "\nOperation not supported.");
-                }
-
+                validateIfResourceIsActiveInFeed(dataverseName, datasetName);
             } else {
                 // External dataset
                 // Check if the dataset is indexible
@@ -1070,7 +1123,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
 
     private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
         TypeDecl stmtCreateType = (TypeDecl) stmt;
-        String dataverseName = getActiveDataverseName(stmtCreateType.getDataverseName());
+        String dataverseName = getActiveDataverse(stmtCreateType.getDataverseName());
         String typeName = stmtCreateType.getIdent().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1130,21 +1183,26 @@ public class AqlTranslator extends AbstractAqlTranslator {
             }
 
             //# disconnect all feeds from any datasets in the dataverse.
-            List<FeedActivity> feedActivities = MetadataManager.INSTANCE.getActiveFeeds(mdTxnCtx, dataverseName, null);
+            List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE
+                    .getActiveFeedConnections(null);
             DisconnectFeedStatement disStmt = null;
             Identifier dvId = new Identifier(dataverseName);
-            for (FeedActivity fa : feedActivities) {
-                disStmt = new DisconnectFeedStatement(dvId, new Identifier(fa.getFeedName()), new Identifier(
-                        fa.getDatasetName()));
-                try {
-                    handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Disconnected feed " + fa.getFeedName() + " from dataset " + fa.getDatasetName());
-                    }
-                } catch (Exception exception) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Unable to disconnect feed " + fa.getFeedName() + " from dataset "
-                                + fa.getDatasetName() + ". Encountered exception " + exception);
+            for (FeedConnectionId connection : activeFeedConnections) {
+                FeedId feedId = connection.getFeedId();
+                if (feedId.getDataverse().equals(dataverseName)) {
+                    disStmt = new DisconnectFeedStatement(dvId, new Identifier(feedId.getFeedName()), new Identifier(
+                            connection.getDatasetName()));
+                    try {
+                        handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Disconnected feed " + feedId.getFeedName() + " from dataset "
+                                    + connection.getDatasetName());
+                        }
+                    } catch (Exception exception) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to disconnect feed " + feedId.getFeedName() + " from dataset "
+                                    + connection.getDatasetName() + ". Encountered exception " + exception);
+                        }
                     }
                 }
             }
@@ -1258,7 +1316,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
     private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         DropStatement stmtDelete = (DropStatement) stmt;
-        String dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
+        String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
         String datasetName = stmtDelete.getDatasetName().getValue();
 
         ProgressState progress = ProgressState.NO_PROGRESS;
@@ -1281,19 +1339,18 @@ public class AqlTranslator extends AbstractAqlTranslator {
                 }
             }
 
+            Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>();
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 // prepare job spec(s) that would disconnect any active feeds involving the dataset.
-                List<FeedActivity> feedActivities = MetadataManager.INSTANCE.getActiveFeeds(mdTxnCtx, dataverseName,
-                        datasetName);
-                List<JobSpecification> disconnectFeedJobSpecs = new ArrayList<JobSpecification>();
-                if (feedActivities != null && !feedActivities.isEmpty()) {
-                    for (FeedActivity fa : feedActivities) {
-                        JobSpecification jobSpec = FeedOperations.buildDisconnectFeedJobSpec(dataverseName,
-                                fa.getFeedName(), datasetName, metadataProvider, fa);
-                        disconnectFeedJobSpecs.add(jobSpec);
+                List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
+                if (feedConnections != null && !feedConnections.isEmpty()) {
+                    for (FeedConnectionId connection : feedConnections) {
+                        Pair<JobSpecification, Boolean> p = FeedOperations.buildDisconnectFeedJobSpec(metadataProvider,
+                                connection);
+                        disconnectJobList.put(connection, p);
                         if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Disconnected feed " + fa.getFeedName() + " from dataset " + datasetName
-                                    + " as dataset is being dropped");
+                            LOGGER.info("Disconnecting feed " + connection.getFeedId().getFeedName() + " from dataset "
+                                    + datasetName + " as dataset is being dropped");
                         }
                     }
                 }
@@ -1322,8 +1379,8 @@ public class AqlTranslator extends AbstractAqlTranslator {
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 //# disconnect the feeds
-                for (JobSpecification jobSpec : disconnectFeedJobSpecs) {
-                    runJob(hcc, jobSpec, true);
+                for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
+                    runJob(hcc, p.first, true);
                 }
 
                 //#. run the jobs
@@ -1427,7 +1484,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
 
         IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
         String datasetName = stmtIndexDrop.getDatasetName().getValue();
-        String dataverseName = getActiveDataverseName(stmtIndexDrop.getDataverseName());
+        String dataverseName = getActiveDataverse(stmtIndexDrop.getDataverseName());
         ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
@@ -1447,19 +1504,24 @@ public class AqlTranslator extends AbstractAqlTranslator {
                         + dataverseName);
             }
 
-            List<FeedActivity> feedActivities = MetadataManager.INSTANCE.getActiveFeeds(mdTxnCtx, dataverseName,
-                    datasetName);
-            if (feedActivities != null && !feedActivities.isEmpty()) {
+            List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
+            boolean resourceInUse = false;
+            if (feedConnections != null && !feedConnections.isEmpty()) {
                 StringBuilder builder = new StringBuilder();
-
-                for (FeedActivity fa : feedActivities) {
-                    builder.append(fa + "\n");
+                for (FeedConnectionId connection : feedConnections) {
+                    if (connection.getDatasetName().equals(datasetName)) {
+                        resourceInUse = true;
+                        builder.append(connection + "\n");
+                    }
+                }
+                if (resourceInUse) {
+                    throw new AsterixException("Dataset" + datasetName
+                            + " is currently being fed into by the following feeds " + "." + builder.toString()
+                            + "\nOperation not supported.");
                 }
-                throw new AsterixException("Dataset" + datasetName
-                        + " is currently being fed into by the following feeds " + "." + builder.toString()
-                        + "\nOperation not supported.");
             }
 
+
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 indexName = stmtIndexDrop.getIndexName().getValue();
                 Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
@@ -1620,7 +1682,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
     private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
 
         TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
-        String dataverseName = getActiveDataverseName(stmtTypeDrop.getDataverseName());
+        String dataverseName = getActiveDataverse(stmtTypeDrop.getDataverseName());
         String typeName = stmtTypeDrop.getTypeName().getValue();
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1725,7 +1787,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
     private void handleLoadStatement(AqlMetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc)
             throws Exception {
         LoadStatement loadStmt = (LoadStatement) stmt;
-        String dataverseName = getActiveDataverseName(loadStmt.getDataverseName());
+        String dataverseName = getActiveDataverse(loadStmt.getDataverseName());
         String datasetName = loadStmt.getDatasetName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
@@ -1756,7 +1818,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
             IHyracksClientConnection hcc) throws Exception {
 
         InsertStatement stmtInsert = (InsertStatement) stmt;
-        String dataverseName = getActiveDataverseName(stmtInsert.getDataverseName());
+        String dataverseName = getActiveDataverse(stmtInsert.getDataverseName());
         Query query = stmtInsert.getQuery();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
@@ -1792,7 +1854,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
             IHyracksClientConnection hcc) throws Exception {
 
         DeleteStatement stmtDelete = (DeleteStatement) stmt;
-        String dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
+        String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1846,29 +1908,40 @@ public class AqlTranslator extends AbstractAqlTranslator {
             IHyracksClientConnection hcc) throws Exception {
 
         CreateFeedStatement cfs = (CreateFeedStatement) stmt;
-        String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+        String dataverseName = getActiveDataverse(cfs.getDataverseName());
         String feedName = cfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.createFeedBegin(dataverseName, dataverseName + "." + feedName);
 
-        String adapterName = null;
         Feed feed = null;
         try {
-            adapterName = cfs.getAdapterName();
-
             feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
             if (feed != null) {
                 if (cfs.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     return;
                 } else {
-                    throw new AlgebricksException("A feed with this name " + adapterName + " already exists.");
+                    throw new AlgebricksException("A feed with this name " + feedName + " already exists.");
                 }
             }
 
-            feed = new Feed(dataverseName, feedName, adapterName, cfs.getAdapterConfiguration(),
-                    cfs.getAppliedFunction());
+            switch (stmt.getKind()) {
+                case CREATE_PRIMARY_FEED:
+                    CreatePrimaryFeedStatement cpfs = (CreatePrimaryFeedStatement) stmt;
+                    String adaptorName = cpfs.getAdaptorName();
+                    feed = new PrimaryFeed(dataverseName, feedName, adaptorName, cpfs.getAdaptorConfiguration(),
+                            cfs.getAppliedFunction());
+                    break;
+                case CREATE_SECONDARY_FEED:
+                    CreateSecondaryFeedStatement csfs = (CreateSecondaryFeedStatement) stmt;
+                    feed = new SecondaryFeed(dataverseName, feedName, csfs.getSourceFeedName(),
+                            csfs.getAppliedFunction());
+                    break;
+                default:
+                    throw new IllegalStateException();
+            }
+
             MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
@@ -1878,11 +1951,72 @@ public class AqlTranslator extends AbstractAqlTranslator {
             MetadataLockManager.INSTANCE.createFeedEnd(dataverseName, dataverseName + "." + feedName);
         }
     }
+    
+    private void handleCreateFeedPolicyStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        String dataverse;
+        String policy;
+        FeedPolicy newPolicy = null;
+        CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt;
+        dataverse = getActiveDataverse(null);
+        policy = cfps.getPolicyName();
+        MetadataLockManager.INSTANCE.createFeedPolicyBegin(dataverse, dataverse + "." + policy);
+        try {
+            FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
+                    dataverse, policy);
+            if (feedPolicy != null) {
+                if (cfps.getIfNotExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("A policy with this name " + policy + " already exists.");
+                }
+            }
+            boolean extendingExisting = cfps.getSourcePolicyName() != null;
+            String description = cfps.getDescription() == null ? "" : cfps.getDescription();
+            if (extendingExisting) {
+                FeedPolicy sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(
+                        metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
+                if (sourceFeedPolicy == null) {
+                    sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
+                            MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
+                    if (sourceFeedPolicy == null) {
+                        throw new AlgebricksException("Unknown policy " + cfps.getSourcePolicyName());
+                    }
+                }
+                Map<String, String> policyProperties = sourceFeedPolicy.getProperties();
+                policyProperties.putAll(cfps.getProperties());
+                newPolicy = new FeedPolicy(dataverse, policy, description, policyProperties);
+            } else {
+                Properties prop = new Properties();
+                try {
+                    InputStream stream = new FileInputStream(cfps.getSourcePolicyFile());
+                    prop.load(stream);
+                } catch (Exception e) {
+                    throw new AlgebricksException("Unable to read policy file" + cfps.getSourcePolicyFile());
+                }
+                Map<String, String> policyProperties = new HashMap<String, String>();
+                for (Entry<Object, Object> entry : prop.entrySet()) {
+                    policyProperties.put((String) entry.getKey(), (String) entry.getValue());
+                }
+                newPolicy = new FeedPolicy(dataverse, policy, description, policyProperties);
+            }
+            MetadataManager.INSTANCE.addFeedPolicy(mdTxnCtx, newPolicy);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.createFeedPolicyEnd(dataverse, dataverse + "." + policy);
+        }
+    }
 
     private void handleDropFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         FeedDropStatement stmtFeedDrop = (FeedDropStatement) stmt;
-        String dataverseName = getActiveDataverseName(stmtFeedDrop.getDataverseName());
+        String dataverseName = getActiveDataverse(stmtFeedDrop.getDataverseName());
         String feedName = stmtFeedDrop.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1896,27 +2030,25 @@ public class AqlTranslator extends AbstractAqlTranslator {
                 }
             }
 
-            List<FeedActivity> feedActivities;
-            try {
-                feedActivities = MetadataManager.INSTANCE.getConnectFeedActivitiesForFeed(mdTxnCtx, dataverseName,
-                        feedName);
+            FeedId feedId = new FeedId(dataverseName, feedName);
+            List<FeedConnectionId> activeConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(feedId);
+            if (activeConnections != null && !activeConnections.isEmpty()) {
+                StringBuilder builder = new StringBuilder();
+                for (FeedConnectionId connectionId : activeConnections) {
+                    builder.append(connectionId.getDatasetName() + "\n");
+                }
+
+                throw new AlgebricksException("Feed " + feedId
+                        + " is currently active and connected to the following dataset(s) \n" + builder.toString());
+            } else {
                 MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName);
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            } catch (Exception e) {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                throw new MetadataException(e);
             }
 
-            List<JobSpecification> jobSpecs = new ArrayList<JobSpecification>();
-            for (FeedActivity feedActivity : feedActivities) {
-                JobSpecification jobSpec = FeedOperations.buildDisconnectFeedJobSpec(dataverseName, feedName,
-                        feedActivity.getDatasetName(), metadataProvider, feedActivity);
-                jobSpecs.add(jobSpec);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Removed feed " + feedId);
             }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
 
-            for (JobSpecification spec : jobSpecs) {
-                runJob(hcc, spec, true);
-            }
 
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
@@ -1925,78 +2057,106 @@ public class AqlTranslator extends AbstractAqlTranslator {
             MetadataLockManager.INSTANCE.dropFeedEnd(dataverseName, dataverseName + "." + feedName);
         }
     }
+    
+    private void handleDropFeedPolicyStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        FeedPolicyDropStatement stmtFeedPolicyDrop = (FeedPolicyDropStatement) stmt;
+        String dataverseName = getActiveDataverse(stmtFeedPolicyDrop.getDataverseName());
+        String policyName = stmtFeedPolicyDrop.getPolicyName().getValue();
+        MetadataLockManager.INSTANCE.dropFeedPolicyBegin(dataverseName, dataverseName + "." + policyName);
+
+        try {
+            FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName);
+            if (feedPolicy == null) {
+                if (!stmtFeedPolicyDrop.getIfExists()) {
+                    throw new AlgebricksException("Unknown policy " + policyName + " in dataverse " + dataverseName);
+                }
+            }
+            MetadataManager.INSTANCE.dropFeedPolicy(mdTxnCtx, dataverseName, policyName);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.dropFeedPolicyEnd(dataverseName, dataverseName + "." + policyName);
+        }
+    }
+
 
     private void handleConnectFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
         ConnectFeedStatement cfs = (ConnectFeedStatement) stmt;
-        String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+        String dataverseName = getActiveDataverse(cfs.getDataverseName());
         String feedName = cfs.getFeedName();
         String datasetName = cfs.getDatasetName().getValue();
 
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        boolean readLatchAcquired = true;
+        boolean subscriberRegistered = false;
+        IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
+        FeedConnectionId feedConnId = null;
+
         MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName, dataverseName
                 + "." + feedName);
-        boolean readLatchAcquired = true;
         try {
-
             metadataProvider.setWriteTransaction(true);
 
             CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(), cfs
                     .getDatasetName().getValue(), cfs.getPolicy(), cfs.getQuery(), cfs.getVarCounter());
 
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
-                    dataverseName, cfs.getDatasetName().getValue());
-            if (dataset == null) {
-                throw new AsterixException("Unknown target dataset :" + cfs.getDatasetName().getValue());
-            }
+            FeedUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(),
+                    metadataProvider.getMetadataTxnContext());
 
-            if (!dataset.getDatasetType().equals(DatasetType.INTERNAL)) {
-                throw new AsterixException("Statement not applicable. Dataset " + cfs.getDatasetName().getValue()
-                        + " is not of required type " + DatasetType.INTERNAL);
-            }
+            Feed feed = FeedUtil.validateIfFeedExists(dataverseName, cfs.getFeedName(),
+                    metadataProvider.getMetadataTxnContext());
 
-            Feed feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    cfs.getFeedName());
-            if (feed == null) {
-                throw new AsterixException("Unknown source feed: " + cfs.getFeedName());
-            }
+            feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName().getValue());
 
-            FeedConnectionId feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName()
-                    .getValue());
-            FeedActivity recentActivity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(mdTxnCtx,
-                    feedConnId, null);
-            boolean isFeedActive = FeedUtil.isFeedActive(recentActivity);
-            if (isFeedActive && !cfs.forceConnect()) {
-                throw new AsterixException("Feed " + cfs.getDatasetName().getValue()
-                        + " is currently ACTIVE. Operation not supported");
+            if (FeedLifecycleListener.INSTANCE.isFeedConnectionActive(feedConnId)) {
+                throw new AsterixException("Feed " + cfs.getFeedName() + " is already connected to dataset "
+                        + cfs.getDatasetName().getValue());
             }
 
-            FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName,
-                    cbfs.getPolicyName());
-            if (feedPolicy == null) {
-                feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx,
-                        MetadataConstants.METADATA_DATAVERSE_NAME, cbfs.getPolicyName());
-                if (feedPolicy == null) {
-                    throw new AsterixException("Unknown feed policy" + cbfs.getPolicyName());
-                }
-            }
+            FeedPolicy feedPolicy = FeedUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(), mdTxnCtx);
 
-            cfs.initialize(metadataProvider.getMetadataTxnContext(), dataset, feed);
-            cbfs.setQuery(cfs.getQuery());
-            metadataProvider.getConfig().put(FunctionUtils.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
-            metadataProvider.getConfig().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, cbfs.getPolicyName());
-            JobSpecification compiled = rewriteCompileQuery(metadataProvider, cfs.getQuery(), cbfs);
-            JobSpecification newJobSpec = FeedUtil.alterJobSpecificationForFeed(compiled, feedConnId, feedPolicy);
+            // All Metadata checks have passed. Feed connect request is valid. //
 
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Altered feed ingestion spec to wrap operators");
-            }
+            FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties());
+            Pair<FeedConnectionRequest, Boolean> p = getFeedConnectionRequest(dataverseName, feed,
+                    cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
+            FeedConnectionRequest connectionRequest = p.first;
+            boolean createFeedIntakeJob = p.second;
 
+            FeedLifecycleListener.INSTANCE.registerFeedEventSubscriber(feedConnId, eventSubscriber);
+            subscriberRegistered = true;
+            if (createFeedIntakeJob) {
+                FeedId feedId = connectionRequest.getFeedJointKey().getFeedId();
+                PrimaryFeed primaryFeed = (PrimaryFeed) MetadataManager.INSTANCE.getFeed(mdTxnCtx,
+                        feedId.getDataverse(), feedId.getFeedName());
+                Pair<JobSpecification, IFeedAdapterFactory> pair = FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
+                        metadataProvider, policyAccessor);
+                runJob(hcc, pair.first, false);
+                IFeedAdapterFactory adapterFactory = pair.second;
+                if (adapterFactory.isRecordTrackingEnabled()) {
+                    FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
+                            adapterFactory.createIntakeProgressTracker());
+                }
+                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+            }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
+            readLatchAcquired = false;
+            eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
+            if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) {
+                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); // blocking call
+            }
             String waitForCompletionParam = metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION);
             boolean waitForCompletion = waitForCompletionParam == null ? false : Boolean
                     .valueOf(waitForCompletionParam);
@@ -2005,7 +2165,6 @@ public class AqlTranslator extends AbstractAqlTranslator {
                         dataverseName + "." + feedName);
                 readLatchAcquired = false;
             }
-            runJob(hcc, newJobSpec, waitForCompletion);
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
@@ -2016,18 +2175,127 @@ public class AqlTranslator extends AbstractAqlTranslator {
                 MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
                         dataverseName + "." + feedName);
             }
+            if (subscriberRegistered) {
+                FeedLifecycleListener.INSTANCE.deregisterFeedEventSubscriber(feedConnId, eventSubscriber);
+            }
+        }
+    }
+    
+    /**
+     * Generates a subscription request corresponding to a connect feed request. In addition, provides a boolean
+     * flag indicating if feed intake job needs to be started (source primary feed not found to be active).
+     * 
+     * @param dataverse
+     * @param feed
+     * @param dataset
+     * @param feedPolicy
+     * @param mdTxnCtx
+     * @return
+     * @throws MetadataException
+     */
+    private Pair<FeedConnectionRequest, Boolean> getFeedConnectionRequest(String dataverse, Feed feed, String dataset,
+            FeedPolicy feedPolicy, MetadataTransactionContext mdTxnCtx) throws MetadataException {
+        IFeedJoint sourceFeedJoint = null;
+        FeedConnectionRequest request = null;
+        List<String> functionsToApply = new ArrayList<String>();
+        boolean needIntakeJob = false;
+        List<IFeedJoint> jointsToRegister = new ArrayList<IFeedJoint>();
+        FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), dataset);
+
+        ConnectionLocation connectionLocation = null;
+        FeedJointKey feedJointKey = getFeedJointKey(feed, mdTxnCtx);
+        boolean isFeedJointAvailable = FeedLifecycleListener.INSTANCE.isFeedJointAvailable(feedJointKey);
+        if (!isFeedJointAvailable) {
+            sourceFeedJoint = FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
+            if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable.
+                connectionLocation = ConnectionLocation.SOURCE_FEED_INTAKE_STAGE;
+                FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId 
+                Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getFeedName());
+                FeedJointKey intakeFeedJointKey = new FeedJointKey(sourceFeedId, new ArrayList<String>());
+                sourceFeedJoint = new FeedJoint(intakeFeedJointKey, primaryFeed.getFeedId(), connectionLocation,
+                        FeedJointType.INTAKE, connectionId);
+                jointsToRegister.add(sourceFeedJoint);
+                needIntakeJob = true;
+            } else {
+                connectionLocation = sourceFeedJoint.getConnectionLocation();
+            }
+
+            String[] functions = feedJointKey.getStringRep()
+                    .substring(sourceFeedJoint.getFeedJointKey().getStringRep().length()).trim().split(":");
+            for (String f : functions) {
+                if (f.trim().length() > 0) {
+                    functionsToApply.add(f);
+                }
+            }
+            // register the compute feed point that represents the final output from the collection of
+            // functions that will be applied. 
+            if (!functionsToApply.isEmpty()) {
+                FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply);
+                IFeedJoint computeFeedJoint = new FeedJoint(computeFeedJointKey, feed.getFeedId(),
+                        ConnectionLocation.SOURCE_FEED_COMPUTE_STAGE, FeedJointType.COMPUTE, connectionId);
+                jointsToRegister.add(computeFeedJoint);
+            }
+            for (IFeedJoint joint : jointsToRegister) {
+                FeedLifecycleListener.INSTANCE.registerFeedJoint(joint);
+            }
+        } else {
+            sourceFeedJoint = FeedLifecycleListener.INSTANCE.getFeedJoint(feedJointKey);
+            connectionLocation = sourceFeedJoint.getConnectionLocation();
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Feed joint " + sourceFeedJoint + " is available! need not apply any further computation");
+            }
         }
+
+        request = new FeedConnectionRequest(sourceFeedJoint.getFeedJointKey(), connectionLocation, functionsToApply,
+                dataset, feedPolicy.getPolicyName(), feedPolicy.getProperties(), feed.getFeedId());
+
+        sourceFeedJoint.addConnectionRequest(request);
+        return new Pair<FeedConnectionRequest, Boolean>(request, needIntakeJob);
     }
+    
+    /*
+     * Gets the feed joint corresponding to the feed definition. Tuples constituting the feed are 
+     * available at this feed joint.
+     */
+    private FeedJointKey getFeedJointKey(Feed feed, MetadataTransactionContext ctx) throws MetadataException {
+        Feed sourceFeed = feed;
+        List<String> appliedFunctions = new ArrayList<String>();
+        while (sourceFeed.getFeedType().equals(FeedType.SECONDARY)) {
+            if (sourceFeed.getAppliedFunction() != null) {
+                appliedFunctions.add(0, sourceFeed.getAppliedFunction().getName());
+            }
+            Feed parentFeed = MetadataManager.INSTANCE.getFeed(ctx, feed.getDataverseName(),
+                    ((SecondaryFeed) sourceFeed).getSourceFeedName());
+            sourceFeed = parentFeed;
+        }
+
+        if (sourceFeed.getAppliedFunction() != null) {
+            appliedFunctions.add(0, sourceFeed.getAppliedFunction().getName());
+        }
 
+        return new FeedJointKey(sourceFeed.getFeedId(), appliedFunctions);
+    }
+    
     private void handleDisconnectFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
-        String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+        String dataverseName = getActiveDataverse(cfs.getDataverseName());
         String datasetName = cfs.getDatasetName().getValue();
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        FeedUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx);
+        Feed feed = FeedUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
+
+        FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue());
+        boolean isFeedConnectionActive = FeedLifecycleListener.INSTANCE.isFeedConnectionActive(connectionId);
+        if (!isFeedConnectionActive) {
+            throw new AsterixException("Feed " + feed.getFeedId().getFeedName() + " is currently not connected to "
+                    + cfs.getDatasetName().getValue() + ". Invalid operation!");
+        }
+
         MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName,
                 dataverseName + "." + cfs.getFeedName());
         try {
@@ -2037,47 +2305,87 @@ public class AqlTranslator extends AbstractAqlTranslator {
                 throw new AsterixException("Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse "
                         + dataverseName);
             }
-            if (!dataset.getDatasetType().equals(DatasetType.INTERNAL)) {
-                throw new AsterixException("Statement not applicable. Dataset " + cfs.getDatasetName().getValue()
-                        + " is not of required type " + DatasetType.INTERNAL);
+
+            Pair<JobSpecification, Boolean> specDisconnectType = FeedOperations.buildDisconnectFeedJobSpec(
+                    metadataProvider, connectionId);
+            JobSpecification jobSpec = specDisconnectType.first;
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+            runJob(hcc, jobSpec, true);
+
+            if (!specDisconnectType.second) {
+                CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(connectionId);
+                FeedLifecycleListener.INSTANCE.reportPartialDisconnection(connectionId);
             }
 
-            Feed feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, cfs
-                    .getFeedName().getValue());
-            if (feed == null) {
-                throw new AsterixException("Unknown source feed :" + cfs.getFeedName());
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
             }
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+                    dataverseName + "." + cfs.getFeedName());
+        }
+    }
+    
+    private void handleSubscribeFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
 
-            FeedActivity feedActivity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(mdTxnCtx,
-                    new FeedConnectionId(dataverseName, feed.getFeedName(), datasetName), null);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Subscriber Feed Statement :" + stmt);
+        }
 
-            boolean isFeedActive = FeedUtil.isFeedActive(feedActivity);
-            if (!isFeedActive) {
-                throw new AsterixException("Feed " + cfs.getDatasetName().getValue()
-                        + " is currently INACTIVE. Operation not supported");
-            }
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        metadataProvider.setWriteTransaction(true);
+        SubscribeFeedStatement bfs = (SubscribeFeedStatement) stmt;
+        bfs.initialize(metadataProvider.getMetadataTxnContext());
+
+        CompiledSubscribeFeedStatement csfs = new CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(),
+                bfs.getQuery(), bfs.getVarCounter());
+        metadataProvider.getConfig().put(FunctionUtils.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
+        metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + bfs.getPolicy());
+        metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
+                StringUtils.join(bfs.getLocations(), ','));
+
+        JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), csfs);
+        FeedConnectionId feedConnectionId = new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), bfs
+                .getSubscriptionRequest().getTargetDataset());
+        String dataverse = feedConnectionId.getFeedId().getDataverse();
+        String dataset = feedConnectionId.getDatasetName();
+        MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset, dataverse + "."
+                + feedConnectionId.getFeedId().getFeedName());
 
-            JobSpecification jobSpec = FeedOperations.buildDisconnectFeedJobSpec(dataverseName, cfs.getFeedName()
-                    .getValue(), cfs.getDatasetName().getValue(), metadataProvider, feedActivity);
+        try {
 
+          
+            JobSpecification alteredJobSpec = FeedUtil.alterJobSpecificationForFeed(compiled, feedConnectionId, bfs
+                    .getSubscriptionRequest().getPolicyParameters());
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            runJob(hcc, jobSpec, true);
+
+            if (compiled != null) {
+                runJob(hcc, alteredJobSpec, false);
+            }
+
         } catch (Exception e) {
+            e.printStackTrace();
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName,
-                    dataverseName + "." + cfs.getFeedName());
+            MetadataLockManager.INSTANCE.subscribeFeedEnd(dataverse, dataverse + "." + dataset, dataverse + "."
+                    + feedConnectionId.getFeedId().getFeedName());
         }
     }
 
     private void handleCompactStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         CompactStatement compactStatement = (CompactStatement) stmt;
-        String dataverseName = getActiveDataverseName(compactStatement.getDataverseName());
+        String dataverseName = getActiveDataverse(compactStatement.getDataverseName());
         String datasetName = compactStatement.getDatasetName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
@@ -2261,7 +2569,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
     private void handleExternalDatasetRefreshStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         RefreshExternalDatasetStatement stmtRefresh = (RefreshExternalDatasetStatement) stmt;
-        String dataverseName = getActiveDataverseName(stmtRefresh.getDataverseName());
+        String dataverseName = getActiveDataverse(stmtRefresh.getDataverseName());
         String datasetName = stmtRefresh.getDatasetName().getValue();
         ExternalDatasetTransactionState transactionState = ExternalDatasetTransactionState.COMMIT;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -2503,8 +2811,8 @@ public class AqlTranslator extends AbstractAqlTranslator {
         RunStatement pregelixStmt = (RunStatement) stmt;
         boolean bActiveTxn = true;
 
-        String dataverseNameFrom = getActiveDataverseName(pregelixStmt.getDataverseNameFrom());
-        String dataverseNameTo = getActiveDataverseName(pregelixStmt.getDataverseNameTo());
+        String dataverseNameFrom = getActiveDataverse(pregelixStmt.getDataverseNameFrom());
+        String dataverseNameTo = getActiveDataverse(pregelixStmt.getDataverseNameTo());
         String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue();
         String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue();
 
@@ -2745,7 +3053,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
         throw new AlgebricksException("dataverse not specified");
     }
 
-    private String getActiveDataverseName(Identifier dataverse) throws AlgebricksException {
+    private String getActiveDataverse(Identifier dataverse) throws AlgebricksException {
         return getActiveDataverseName(dataverse != null ? dataverse.getValue() : null);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/CentralFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/CentralFeedManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/CentralFeedManager.java
new file mode 100644
index 0000000..359da16
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/CentralFeedManager.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.util.List;
+
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.parser.AQLParser;
+import edu.uci.ics.asterix.aql.translator.AqlTranslator;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.ICentralFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLoadManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.metadata.feeds.SocketMessageListener;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class CentralFeedManager implements ICentralFeedManager {
+
+    private static final ICentralFeedManager centralFeedManager = new CentralFeedManager();
+
+    public static ICentralFeedManager getInstance() {
+        return centralFeedManager;
+    }
+
+    private final int port;
+    private final IFeedLoadManager feedLoadManager;
+    private final IFeedTrackingManager feedTrackingManager;
+    private final SocketMessageListener messageListener;
+
+    private CentralFeedManager() {
+        this.port = AsterixAppContextInfo.getInstance().getFeedProperties().getFeedCentralManagerPort();
+        this.feedLoadManager = new FeedLoadManager();
+        this.feedTrackingManager = new FeedTrackingManager();
+        this.messageListener = new SocketMessageListener(port, new FeedMessageReceiver(this));
+    }
+
+    @Override
+    public void start() throws AsterixException {
+        messageListener.start();
+    }
+
+    @Override
+    public void stop() throws AsterixException, IOException {
+        messageListener.stop();
+    }
+
+    public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
+        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        JobId jobId = hcc.startJob(spec);
+        if (waitForCompletion) {
+            hcc.waitForCompletion(jobId);
+        }
+        return jobId;
+    }
+
+    @Override
+    public IFeedLoadManager getFeedLoadManager() {
+        return feedLoadManager;
+    }
+
+    @Override
+    public IFeedTrackingManager getFeedTrackingManager() {
+        return feedTrackingManager;
+    }
+
+    public static class AQLExecutor {
+
+        private static final PrintWriter out = new PrintWriter(System.out, true);
+
+        public static void executeAQL(String aql) throws Exception {
+            AQLParser parser = new AQLParser(new StringReader(aql));
+            List<Statement> statements;
+            statements = parser.Statement();
+            SessionConfig pc = new SessionConfig(out, OutputFormat.ADM);
+            AqlTranslator translator = new AqlTranslator(statements, pc);
+            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, AqlTranslator.ResultDelivery.SYNC);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedCollectInfo.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedCollectInfo.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedCollectInfo.java
new file mode 100644
index 0000000..db3ce77
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedCollectInfo.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.asterix.feeds;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedCollectInfo extends FeedInfo {
+    public FeedId sourceFeedId;
+    public FeedConnectionId feedConnectionId;
+    public List<String> collectLocations = new ArrayList<String>();
+    public List<String> computeLocations = new ArrayList<String>();
+    public List<String> storageLocations = new ArrayList<String>();
+    public Map<String, String> feedPolicy;
+    public String superFeedManagerHost;
+    public int superFeedManagerPort;
+    public boolean fullyConnected;
+
+    public FeedCollectInfo(FeedId sourceFeedId, FeedConnectionId feedConnectionId, JobSpecification jobSpec,
+            JobId jobId, Map<String, String> feedPolicy) {
+        super(jobSpec, jobId, FeedInfoType.COLLECT);
+        this.sourceFeedId = sourceFeedId;
+        this.feedConnectionId = feedConnectionId;
+        this.feedPolicy = feedPolicy;
+        this.fullyConnected = true;
+    }
+
+    @Override
+    public String toString() {
+        return FeedInfoType.COLLECT + "[" + feedConnectionId + "]";
+    }
+}