You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:20 UTC
[23/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
deleted file mode 100644
index 5cb53b2..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDashboardServlet.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.api.http.servlet;
-
-import java.awt.image.BufferedImage;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.imageio.ImageIO;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
-
-public class FeedDashboardServlet extends HttpServlet {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOGGER = Logger.getLogger(FeedDashboardServlet.class.getName());
-
- @Override
- public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
- String resourcePath = null;
- String requestURI = request.getRequestURI();
-
- if (requestURI.equals("/")) {
- response.setContentType("text/html");
- resourcePath = "/feed/dashboard.html";
- } else {
- resourcePath = requestURI + ".html";
- }
-
- try {
- InputStream is = FeedDashboardServlet.class.getResourceAsStream(resourcePath);
- if (is == null) {
- response.sendError(HttpServletResponse.SC_NOT_FOUND);
- return;
- }
-
- // Special handler for font files and .png resources
- if (resourcePath.endsWith(".png")) {
-
- BufferedImage img = ImageIO.read(is);
- OutputStream outputStream = response.getOutputStream();
- String formatName = "png";
- response.setContentType("image/png");
- ImageIO.write(img, formatName, outputStream);
- outputStream.close();
- return;
-
- }
-
- response.setCharacterEncoding("utf-8");
- InputStreamReader isr = new InputStreamReader(is);
- StringBuilder sb = new StringBuilder();
- BufferedReader br = new BufferedReader(isr);
- String line = br.readLine();
-
- while (line != null) {
- sb.append(line + "\n");
- line = br.readLine();
- }
-
- String feedName = request.getParameter("feed");
- String datasetName = request.getParameter("dataset");
- String dataverseName = request.getParameter("dataverse");
-
- FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
-
- String outStr = null;
- if (requestURI.startsWith("/webui/static")) {
- outStr = sb.toString();
- } else {
- MetadataManager.INSTANCE.init();
- MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
- FeedActivity activity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(ctx, feedId,
- FeedActivityType.FEED_BEGIN);
- MetadataManager.INSTANCE.commitTransaction(ctx);
-
- Map<String, String> activityDetails = activity.getFeedActivityDetails();
-
- String host = activityDetails.get(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_HOST);
- int port = Integer.parseInt(activityDetails
- .get(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_PORT));
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" Super Feed Maanger address :" + host + "[" + port + "]");
- }
-
- String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
- String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
- String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
- String ingestionPolicy = activityDetails.get(FeedActivityDetails.FEED_POLICY_NAME);
- String activeSince = activity.getLastUpdatedTimestamp();
-
- outStr = String.format(sb.toString(), dataverseName, datasetName, feedName, ingestLocations,
- computeLocations, storageLocations, ingestionPolicy, activeSince);
- FeedServletUtil.initiateSubscription(feedId, host, port);
- }
-
- PrintWriter out = response.getWriter();
- out.println(outStr);
- } catch (ACIDException | MetadataException e) {
- e.printStackTrace();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
deleted file mode 100644
index 463ce01..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.api.http.servlet;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
-
-public class FeedDataProviderServlet extends HttpServlet {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
-
- String feedName = request.getParameter("feed");
- String datasetName = request.getParameter("dataset");
- String dataverseName = request.getParameter("dataverse");
-
- String report = getFeedReport(feedName, datasetName, dataverseName);
- System.out.println(" REPORT " + report);
- long timestamp = System.currentTimeMillis();
- JSONObject obj = null;
- if (report != null) {
- JSONArray array = new JSONArray();
- try {
- obj = new JSONObject();
- obj.put("type", "report");
- obj.put("time", timestamp);
- obj.put("value", report);
- } catch (JSONException jsoe) {
- throw new IOException(jsoe);
- }
- } else {
- obj = verifyIfFeedIsAlive(dataverseName, feedName, datasetName);
- }
-
- PrintWriter out = response.getWriter();
- out.println(obj.toString());
- }
-
- private String getFeedReport(String feedName, String datasetName, String dataverseName) {
- FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
- LinkedBlockingQueue<String> queue = FeedLifecycleListener.INSTANCE.getFeedReportQueue(feedId);
- String report = null;
- try {
- report = queue.poll(25, TimeUnit.SECONDS);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return report;
- }
-
- private JSONObject verifyIfFeedIsAlive(String dataverseName, String feedName, String datasetName) {
- JSONObject obj = new JSONObject();
- try {
- MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
- List<FeedActivity> feedActivities = MetadataManager.INSTANCE
- .getActiveFeeds(ctx, dataverseName, datasetName);
- FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
- FeedActivity activity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(ctx, feedId, null);
- switch (activity.getActivityType()) {
- case FEED_BEGIN:
- Map<String, String> activityDetails = activity.getFeedActivityDetails();
- String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
- String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
- String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
- obj.put("status", "active");
- obj.put("type", "reload");
- obj.put("ingestLocations", ingestLocations);
- obj.put("computeLocations", computeLocations);
- obj.put("storageLocations", storageLocations);
- System.out.println(" RE LOADING " + " ingestion at " + ingestLocations + " compute at "
- + computeLocations + " storage at " + storageLocations);
- break;
- case FEED_FAILURE:
- obj.put("status", "failed");
- break;
- case FEED_END:
- obj.put("status", "ended");
- break;
- }
- } catch (Exception e) {
- // ignore
- }
- return obj;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
index 96b3c31..adca4c6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
@@ -21,19 +21,20 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
-import java.util.List;
+import java.util.Collection;
import javax.imageio.ImageIO;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLoadManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
public class FeedServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@@ -50,73 +51,116 @@ public class FeedServlet extends HttpServlet {
resourcePath = requestURI;
}
- try {
- InputStream is = FeedServlet.class.getResourceAsStream(resourcePath);
- if (is == null) {
- response.sendError(HttpServletResponse.SC_NOT_FOUND);
- return;
- }
+ InputStream is = FeedServlet.class.getResourceAsStream(resourcePath);
+ if (is == null) {
+ response.sendError(HttpServletResponse.SC_NOT_FOUND);
+ return;
+ }
- // Special handler for font files and .png resources
- if (resourcePath.endsWith(".png")) {
+ // Special handler for font files and .png resources
+ if (resourcePath.endsWith(".png")) {
- BufferedImage img = ImageIO.read(is);
- OutputStream outputStream = response.getOutputStream();
- String formatName = "png";
- response.setContentType("image/png");
- ImageIO.write(img, formatName, outputStream);
- outputStream.close();
- return;
+ BufferedImage img = ImageIO.read(is);
+ OutputStream outputStream = response.getOutputStream();
+ String formatName = "png";
+ response.setContentType("image/png");
+ ImageIO.write(img, formatName, outputStream);
+ outputStream.close();
+ return;
- }
+ }
- response.setCharacterEncoding("utf-8");
- InputStreamReader isr = new InputStreamReader(is);
- StringBuilder sb = new StringBuilder();
- BufferedReader br = new BufferedReader(isr);
- String line = br.readLine();
+ response.setCharacterEncoding("utf-8");
+ InputStreamReader isr = new InputStreamReader(is);
+ StringBuilder sb = new StringBuilder();
+ BufferedReader br = new BufferedReader(isr);
+ String line = br.readLine();
- while (line != null) {
- sb.append(line + "\n");
- line = br.readLine();
- }
+ while (line != null) {
+ sb.append(line + "\n");
+ line = br.readLine();
+ }
- String outStr = null;
- if (requestURI.startsWith("/webui/static")) {
- outStr = sb.toString();
+ String outStr = null;
+ if (requestURI.startsWith("/webui/static")) {
+ outStr = sb.toString();
+ } else {
+ Collection<FeedActivity> lfa = CentralFeedManager.getInstance().getFeedLoadManager().getFeedActivities();
+ StringBuilder ldStr = new StringBuilder();
+ ldStr.append("<br />");
+ ldStr.append("<br />");
+ if (lfa == null || lfa.isEmpty()) {
+ ldStr.append("Currently there are no active feeds in AsterixDB");
} else {
- MetadataManager.INSTANCE.init();
- MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
- List<FeedActivity> lfa = MetadataManager.INSTANCE.getActiveFeeds(ctx, null, null);
- StringBuilder ldStr = new StringBuilder();
- ldStr.append("<br />");
- ldStr.append("<br />");
- if (lfa == null || lfa.isEmpty()) {
- ldStr.append("Currently there are no active feeds in the Asterix");
- } else {
- ldStr.append("Active Feeds");
- }
- FeedConnectionId feedId = null;
- for (FeedActivity feedActivity : lfa) {
- feedId = new FeedConnectionId(feedActivity.getDataverseName(), feedActivity.getFeedName(),
- feedActivity.getDatasetName());
- ldStr.append("<br />");
- ldStr.append("<br />");
- ldStr.append("<a href=\"/feed/dashboard?dataverse=" + feedActivity.getDataverseName() + "&feed="
- + feedActivity.getFeedName() + "&dataset=" + feedActivity.getDatasetName() + "\">" + feedId
- + "</a>");
- ldStr.append("<br />");
- }
-
- outStr = String.format(sb.toString(), ldStr.toString());
- MetadataManager.INSTANCE.commitTransaction(ctx);
-
+ ldStr.append("Active Feeds");
}
+ insertTable(ldStr, lfa);
+ outStr = String.format(sb.toString(), ldStr.toString());
+
+ }
+
+ PrintWriter out = response.getWriter();
+ out.println(outStr);
+ }
- PrintWriter out = response.getWriter();
- out.println(outStr);
- } catch (ACIDException | MetadataException e) {
+ private void insertTable(StringBuilder html, Collection<FeedActivity> list) {
+ html.append("<table style=\"width:100%\">");
+ html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_FEED_NAME + "</th>");
+ html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_DATASET_NAME + "</th>");
+ html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_ACTIVE_SINCE + "</th>");
+ html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_INTAKE_STAGE + "</th>");
+ html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_COMPUTE_STAGE + "</th>");
+ html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_STORE_STAGE + "</th>");
+ html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_INTAKE_RATE + "</th>");
+ html.append("<th>" + FeedServletUtil.Constants.TABLE_HEADER_STORE_RATE + "</th>");
+ for (FeedActivity activity : list) {
+ insertRow(html, activity);
+ }
+ }
+ private void insertRow(StringBuilder html, FeedActivity activity) {
+ String intake = activity.getFeedActivityDetails().get(FeedActivityDetails.INTAKE_LOCATIONS);
+ String compute = activity.getFeedActivityDetails().get(FeedActivityDetails.COMPUTE_LOCATIONS);
+ String store = activity.getFeedActivityDetails().get(FeedActivityDetails.STORAGE_LOCATIONS);
+
+ IFeedLoadManager loadManager = CentralFeedManager.getInstance().getFeedLoadManager();
+ FeedConnectionId connectionId = new FeedConnectionId(new FeedId(activity.getDataverseName(),
+ activity.getFeedName()), activity.getDatasetName());
+ int intakeRate = loadManager.getOutflowRate(connectionId, FeedRuntimeType.COLLECT) * intake.split(",").length;
+ int storeRate = loadManager.getOutflowRate(connectionId, FeedRuntimeType.STORE) * store.split(",").length;
+
+ html.append("<tr>");
+ html.append("<td>" + activity.getFeedName() + "</td>");
+ html.append("<td>" + activity.getDatasetName() + "</td>");
+ html.append("<td>" + activity.getConnectTimestamp() + "</td>");
+ //html.append("<td>" + insertLink(html, FeedDashboardServlet.getParameterizedURL(activity), "Details") + "</td>");
+ html.append("<td>" + intake + "</td>");
+ html.append("<td>" + compute + "</td>");
+ html.append("<td>" + store + "</td>");
+ String color = "black";
+ if (intakeRate > storeRate) {
+ color = "red";
+ }
+ if (intakeRate < 0) {
+ html.append("<td>" + "UNKNOWN" + "</td>");
+ } else {
+ html.append("<td>" + insertColoredText("" + intakeRate, color) + " rec/sec" + "</td>");
+ }
+ if (storeRate < 0) {
+ html.append("<td>" + "UNKNOWN" + "</td>");
+ } else {
+ html.append("<td>" + insertColoredText("" + storeRate, color) + " rec/sec" + "</td>");
}
+ html.append("</tr>");
+ }
+
+ private String insertLink(StringBuilder html, String url, String displayText) {
+ return ("<a href=\"" + url + "\">" + displayText + "</a>");
+ }
+
+ private String insertColoredText(String s, String color) {
+ return "<font color=\"" + color + "\">" + s + "</font>";
}
}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServletUtil.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServletUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServletUtil.java
index ff29a23..8567810 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServletUtil.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServletUtil.java
@@ -9,7 +9,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
import edu.uci.ics.asterix.metadata.feeds.RemoteSocketMessageListener;
public class FeedServletUtil {
@@ -17,6 +17,17 @@ public class FeedServletUtil {
private static final Logger LOGGER = Logger.getLogger(FeedServletUtil.class.getName());
private static final char EOL = (char) "\n".getBytes()[0];
+ public static final class Constants {
+ public static final String TABLE_HEADER_FEED_NAME = "Feed";
+ public static final String TABLE_HEADER_DATASET_NAME = "Dataset";
+ public static final String TABLE_HEADER_ACTIVE_SINCE = "Timestamp";
+ public static final String TABLE_HEADER_INTAKE_STAGE = "Intake Stage";
+ public static final String TABLE_HEADER_COMPUTE_STAGE = "Compute Stage";
+ public static final String TABLE_HEADER_STORE_STAGE = "Store Stage";
+ public static final String TABLE_HEADER_INTAKE_RATE = "Intake";
+ public static final String TABLE_HEADER_STORE_RATE = "Store";
+ }
+
public static void initiateSubscription(FeedConnectionId feedId, String host, int port) throws IOException {
LinkedBlockingQueue<String> outbox = new LinkedBlockingQueue<String>();
int subscriptionPort = port + 1;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
index 3f104fe..bf7ec75 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
@@ -25,9 +25,7 @@ import javax.servlet.http.HttpServletResponse;
import org.json.JSONArray;
import org.json.JSONObject;
-import edu.uci.ics.asterix.api.common.APIFramework;
import edu.uci.ics.asterix.api.common.SessionConfig;
-import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
import edu.uci.ics.asterix.result.ResultReader;
import edu.uci.ics.asterix.result.ResultUtils;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
index 4e8427d..dd19c97 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.api.http.servlet;
import java.io.IOException;
-import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -30,7 +29,6 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import org.json.JSONObject;
-import edu.uci.ics.asterix.api.common.APIFramework;
import edu.uci.ics.asterix.api.common.SessionConfig;
import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
import edu.uci.ics.asterix.aql.base.Statement;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index e5ddf2b..dad5975 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -16,6 +16,8 @@ package edu.uci.ics.asterix.aql.translator;
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.rmi.RemoteException;
@@ -26,6 +28,8 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.logging.Level;
@@ -44,9 +48,12 @@ import edu.uci.ics.asterix.aql.base.Statement;
import edu.uci.ics.asterix.aql.expression.CompactStatement;
import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
import edu.uci.ics.asterix.aql.expression.DatasetDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDecl;
import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
@@ -55,6 +62,7 @@ import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.DropStatement;
import edu.uci.ics.asterix.aql.expression.ExternalDetailsDecl;
import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
import edu.uci.ics.asterix.aql.expression.FunctionDecl;
import edu.uci.ics.asterix.aql.expression.FunctionDropStatement;
import edu.uci.ics.asterix.aql.expression.IDatasetDetailsDecl;
@@ -69,6 +77,7 @@ import edu.uci.ics.asterix.aql.expression.Query;
import edu.uci.ics.asterix.aql.expression.RefreshExternalDatasetStatement;
import edu.uci.ics.asterix.aql.expression.RunStatement;
import edu.uci.ics.asterix.aql.expression.SetStatement;
+import edu.uci.ics.asterix.aql.expression.SubscribeFeedStatement;
import edu.uci.ics.asterix.aql.expression.TypeDecl;
import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
import edu.uci.ics.asterix.aql.expression.TypeExpression;
@@ -82,8 +91,21 @@ import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedJointKey;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint.FeedJointType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
+import edu.uci.ics.asterix.feeds.FeedJoint;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
import edu.uci.ics.asterix.file.DatasetOperations;
import edu.uci.ics.asterix.file.DataverseOperations;
import edu.uci.ics.asterix.file.ExternalIndexingOperations;
@@ -106,14 +128,17 @@ import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.Feed.FeedType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
-import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
+import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleEventSubscriber;
import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
import edu.uci.ics.asterix.metadata.utils.MetadataLockManager;
@@ -138,6 +163,7 @@ import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexCompactSta
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledInsertStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import edu.uci.ics.asterix.translator.TypeTranslator;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -229,7 +255,8 @@ public class AqlTranslator extends AbstractAqlTranslator {
for (Statement stmt : aqlStatements) {
validateOperation(activeDefaultDataverse, stmt);
- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse,
+ CentralFeedManager.getInstance());
metadataProvider.setWriterFactory(writerFactory);
metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
metadataProvider.setOutputFile(outputFile);
@@ -307,7 +334,8 @@ public class AqlTranslator extends AbstractAqlTranslator {
break;
}
- case CREATE_FEED: {
+ case CREATE_PRIMARY_FEED:
+ case CREATE_SECONDARY_FEED: {
handleCreateFeedStatement(metadataProvider, stmt, hcc);
break;
}
@@ -316,6 +344,12 @@ public class AqlTranslator extends AbstractAqlTranslator {
handleDropFeedStatement(metadataProvider, stmt, hcc);
break;
}
+
+ case DROP_FEED_POLICY: {
+ handleDropFeedPolicyStatement(metadataProvider, stmt, hcc);
+ break;
+ }
+
case CONNECT_FEED: {
handleConnectFeedStatement(metadataProvider, stmt, hcc);
break;
@@ -326,6 +360,16 @@ public class AqlTranslator extends AbstractAqlTranslator {
break;
}
+ case SUBSCRIBE_FEED: {
+ handleSubscribeFeedStatement(metadataProvider, stmt, hcc);
+ break;
+ }
+
+ case CREATE_FEED_POLICY: {
+ handleCreateFeedPolicyStatement(metadataProvider, stmt, hcc);
+ break;
+ }
+
case QUERY: {
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
@@ -468,7 +512,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
ProgressState progress = ProgressState.NO_PROGRESS;
DatasetDecl dd = (DatasetDecl) stmt;
- String dataverseName = getActiveDataverseName(dd.getDataverse());
+ String dataverseName = getActiveDataverse(dd.getDataverse());
String datasetName = dd.getName().getValue();
DatasetType dsType = dd.getDatasetType();
String itemTypeName = dd.getItemTypeName().getValue();
@@ -647,6 +691,27 @@ public class AqlTranslator extends AbstractAqlTranslator {
}
}
+ private void validateIfResourceIsActiveInFeed(String dataverseName, String datasetName) throws AsterixException {
+ List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
+ boolean resourceInUse = false;
+ StringBuilder builder = new StringBuilder();
+
+ if (activeFeedConnections != null && !activeFeedConnections.isEmpty()) {
+ for (FeedConnectionId connId : activeFeedConnections) {
+ if (connId.getDatasetName().equals(datasetName)) {
+ resourceInUse = true;
+ builder.append(connId + "\n");
+ }
+ }
+ }
+
+ if (resourceInUse) {
+ throw new AsterixException("Dataset " + datasetName + " is currently being "
+ + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported");
+ }
+
+ }
+
private String getNodeGroupName(Identifier ngNameId, DatasetDecl dd, String dataverse) {
if (ngNameId != null) {
return ngNameId.getValue();
@@ -716,7 +781,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
IHyracksClientConnection hcc) throws Exception {
ProgressState progress = ProgressState.NO_PROGRESS;
CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
- String dataverseName = getActiveDataverseName(stmtCreateIndex.getDataverseName());
+ String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName());
String datasetName = stmtCreateIndex.getDatasetName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -826,19 +891,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
}
if (ds.getDatasetType() == DatasetType.INTERNAL) {
- List<FeedActivity> feedActivities = MetadataManager.INSTANCE.getActiveFeeds(mdTxnCtx, dataverseName,
- datasetName);
- if (feedActivities != null && !feedActivities.isEmpty()) {
- StringBuilder builder = new StringBuilder();
-
- for (FeedActivity fa : feedActivities) {
- builder.append(fa + "\n");
- }
- throw new AsterixException("Dataset" + datasetName
- + " is currently being fed into by the following feeds " + "." + builder.toString()
- + "\nOperation not supported.");
- }
-
+ validateIfResourceIsActiveInFeed(dataverseName, datasetName);
} else {
// External dataset
// Check if the dataset is indexible
@@ -1070,7 +1123,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
TypeDecl stmtCreateType = (TypeDecl) stmt;
- String dataverseName = getActiveDataverseName(stmtCreateType.getDataverseName());
+ String dataverseName = getActiveDataverse(stmtCreateType.getDataverseName());
String typeName = stmtCreateType.getIdent().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1130,21 +1183,26 @@ public class AqlTranslator extends AbstractAqlTranslator {
}
//# disconnect all feeds from any datasets in the dataverse.
- List<FeedActivity> feedActivities = MetadataManager.INSTANCE.getActiveFeeds(mdTxnCtx, dataverseName, null);
+ List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE
+ .getActiveFeedConnections(null);
DisconnectFeedStatement disStmt = null;
Identifier dvId = new Identifier(dataverseName);
- for (FeedActivity fa : feedActivities) {
- disStmt = new DisconnectFeedStatement(dvId, new Identifier(fa.getFeedName()), new Identifier(
- fa.getDatasetName()));
- try {
- handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Disconnected feed " + fa.getFeedName() + " from dataset " + fa.getDatasetName());
- }
- } catch (Exception exception) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to disconnect feed " + fa.getFeedName() + " from dataset "
- + fa.getDatasetName() + ". Encountered exception " + exception);
+ for (FeedConnectionId connection : activeFeedConnections) {
+ FeedId feedId = connection.getFeedId();
+ if (feedId.getDataverse().equals(dataverseName)) {
+ disStmt = new DisconnectFeedStatement(dvId, new Identifier(feedId.getFeedName()), new Identifier(
+ connection.getDatasetName()));
+ try {
+ handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Disconnected feed " + feedId.getFeedName() + " from dataset "
+ + connection.getDatasetName());
+ }
+ } catch (Exception exception) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to disconnect feed " + feedId.getFeedName() + " from dataset "
+ + connection.getDatasetName() + ". Encountered exception " + exception);
+ }
}
}
}
@@ -1258,7 +1316,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
DropStatement stmtDelete = (DropStatement) stmt;
- String dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
+ String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
String datasetName = stmtDelete.getDatasetName().getValue();
ProgressState progress = ProgressState.NO_PROGRESS;
@@ -1281,19 +1339,18 @@ public class AqlTranslator extends AbstractAqlTranslator {
}
}
+ Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>();
if (ds.getDatasetType() == DatasetType.INTERNAL) {
// prepare job spec(s) that would disconnect any active feeds involving the dataset.
- List<FeedActivity> feedActivities = MetadataManager.INSTANCE.getActiveFeeds(mdTxnCtx, dataverseName,
- datasetName);
- List<JobSpecification> disconnectFeedJobSpecs = new ArrayList<JobSpecification>();
- if (feedActivities != null && !feedActivities.isEmpty()) {
- for (FeedActivity fa : feedActivities) {
- JobSpecification jobSpec = FeedOperations.buildDisconnectFeedJobSpec(dataverseName,
- fa.getFeedName(), datasetName, metadataProvider, fa);
- disconnectFeedJobSpecs.add(jobSpec);
+ List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
+ if (feedConnections != null && !feedConnections.isEmpty()) {
+ for (FeedConnectionId connection : feedConnections) {
+ Pair<JobSpecification, Boolean> p = FeedOperations.buildDisconnectFeedJobSpec(metadataProvider,
+ connection);
+ disconnectJobList.put(connection, p);
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Disconnected feed " + fa.getFeedName() + " from dataset " + datasetName
- + " as dataset is being dropped");
+ LOGGER.info("Disconnecting feed " + connection.getFeedId().getFeedName() + " from dataset "
+ + datasetName + " as dataset is being dropped");
}
}
}
@@ -1322,8 +1379,8 @@ public class AqlTranslator extends AbstractAqlTranslator {
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
//# disconnect the feeds
- for (JobSpecification jobSpec : disconnectFeedJobSpecs) {
- runJob(hcc, jobSpec, true);
+ for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
+ runJob(hcc, p.first, true);
}
//#. run the jobs
@@ -1427,7 +1484,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
String datasetName = stmtIndexDrop.getDatasetName().getValue();
- String dataverseName = getActiveDataverseName(stmtIndexDrop.getDataverseName());
+ String dataverseName = getActiveDataverse(stmtIndexDrop.getDataverseName());
ProgressState progress = ProgressState.NO_PROGRESS;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
@@ -1447,19 +1504,24 @@ public class AqlTranslator extends AbstractAqlTranslator {
+ dataverseName);
}
- List<FeedActivity> feedActivities = MetadataManager.INSTANCE.getActiveFeeds(mdTxnCtx, dataverseName,
- datasetName);
- if (feedActivities != null && !feedActivities.isEmpty()) {
+ List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
+ boolean resourceInUse = false;
+ if (feedConnections != null && !feedConnections.isEmpty()) {
StringBuilder builder = new StringBuilder();
-
- for (FeedActivity fa : feedActivities) {
- builder.append(fa + "\n");
+ for (FeedConnectionId connection : feedConnections) {
+ if (connection.getDatasetName().equals(datasetName)) {
+ resourceInUse = true;
+ builder.append(connection + "\n");
+ }
+ }
+ if (resourceInUse) {
+ throw new AsterixException("Dataset" + datasetName
+ + " is currently being fed into by the following feeds " + "." + builder.toString()
+ + "\nOperation not supported.");
}
- throw new AsterixException("Dataset" + datasetName
- + " is currently being fed into by the following feeds " + "." + builder.toString()
- + "\nOperation not supported.");
}
+
if (ds.getDatasetType() == DatasetType.INTERNAL) {
indexName = stmtIndexDrop.getIndexName().getValue();
Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
@@ -1620,7 +1682,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
- String dataverseName = getActiveDataverseName(stmtTypeDrop.getDataverseName());
+ String dataverseName = getActiveDataverse(stmtTypeDrop.getDataverseName());
String typeName = stmtTypeDrop.getTypeName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1725,7 +1787,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
private void handleLoadStatement(AqlMetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc)
throws Exception {
LoadStatement loadStmt = (LoadStatement) stmt;
- String dataverseName = getActiveDataverseName(loadStmt.getDataverseName());
+ String dataverseName = getActiveDataverse(loadStmt.getDataverseName());
String datasetName = loadStmt.getDatasetName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
@@ -1756,7 +1818,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
IHyracksClientConnection hcc) throws Exception {
InsertStatement stmtInsert = (InsertStatement) stmt;
- String dataverseName = getActiveDataverseName(stmtInsert.getDataverseName());
+ String dataverseName = getActiveDataverse(stmtInsert.getDataverseName());
Query query = stmtInsert.getQuery();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
@@ -1792,7 +1854,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
IHyracksClientConnection hcc) throws Exception {
DeleteStatement stmtDelete = (DeleteStatement) stmt;
- String dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
+ String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1846,29 +1908,40 @@ public class AqlTranslator extends AbstractAqlTranslator {
IHyracksClientConnection hcc) throws Exception {
CreateFeedStatement cfs = (CreateFeedStatement) stmt;
- String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+ String dataverseName = getActiveDataverse(cfs.getDataverseName());
String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
MetadataLockManager.INSTANCE.createFeedBegin(dataverseName, dataverseName + "." + feedName);
- String adapterName = null;
Feed feed = null;
try {
- adapterName = cfs.getAdapterName();
-
feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
if (feed != null) {
if (cfs.getIfNotExists()) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
return;
} else {
- throw new AlgebricksException("A feed with this name " + adapterName + " already exists.");
+ throw new AlgebricksException("A feed with this name " + feedName + " already exists.");
}
}
- feed = new Feed(dataverseName, feedName, adapterName, cfs.getAdapterConfiguration(),
- cfs.getAppliedFunction());
+ switch (stmt.getKind()) {
+ case CREATE_PRIMARY_FEED:
+ CreatePrimaryFeedStatement cpfs = (CreatePrimaryFeedStatement) stmt;
+ String adaptorName = cpfs.getAdaptorName();
+ feed = new PrimaryFeed(dataverseName, feedName, adaptorName, cpfs.getAdaptorConfiguration(),
+ cfs.getAppliedFunction());
+ break;
+ case CREATE_SECONDARY_FEED:
+ CreateSecondaryFeedStatement csfs = (CreateSecondaryFeedStatement) stmt;
+ feed = new SecondaryFeed(dataverseName, feedName, csfs.getSourceFeedName(),
+ csfs.getAppliedFunction());
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+
MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
@@ -1878,11 +1951,72 @@ public class AqlTranslator extends AbstractAqlTranslator {
MetadataLockManager.INSTANCE.createFeedEnd(dataverseName, dataverseName + "." + feedName);
}
}
+
+ private void handleCreateFeedPolicyStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws Exception {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ String dataverse;
+ String policy;
+ FeedPolicy newPolicy = null;
+ CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt;
+ dataverse = getActiveDataverse(null);
+ policy = cfps.getPolicyName();
+ MetadataLockManager.INSTANCE.createFeedPolicyBegin(dataverse, dataverse + "." + policy);
+ try {
+ FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
+ dataverse, policy);
+ if (feedPolicy != null) {
+ if (cfps.getIfNotExists()) {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ } else {
+ throw new AlgebricksException("A policy with this name " + policy + " already exists.");
+ }
+ }
+ boolean extendingExisting = cfps.getSourcePolicyName() != null;
+ String description = cfps.getDescription() == null ? "" : cfps.getDescription();
+ if (extendingExisting) {
+ FeedPolicy sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(
+ metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
+ if (sourceFeedPolicy == null) {
+ sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
+ MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
+ if (sourceFeedPolicy == null) {
+ throw new AlgebricksException("Unknown policy " + cfps.getSourcePolicyName());
+ }
+ }
+ Map<String, String> policyProperties = sourceFeedPolicy.getProperties();
+ policyProperties.putAll(cfps.getProperties());
+ newPolicy = new FeedPolicy(dataverse, policy, description, policyProperties);
+ } else {
+ Properties prop = new Properties();
+ try {
+ InputStream stream = new FileInputStream(cfps.getSourcePolicyFile());
+ prop.load(stream);
+ } catch (Exception e) {
+ throw new AlgebricksException("Unable to read policy file" + cfps.getSourcePolicyFile());
+ }
+ Map<String, String> policyProperties = new HashMap<String, String>();
+ for (Entry<Object, Object> entry : prop.entrySet()) {
+ policyProperties.put((String) entry.getKey(), (String) entry.getValue());
+ }
+ newPolicy = new FeedPolicy(dataverse, policy, description, policyProperties);
+ }
+ MetadataManager.INSTANCE.addFeedPolicy(mdTxnCtx, newPolicy);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ abort(e, e, mdTxnCtx);
+ throw e;
+ } finally {
+ MetadataLockManager.INSTANCE.createFeedPolicyEnd(dataverse, dataverse + "." + policy);
+ }
+ }
private void handleDropFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
FeedDropStatement stmtFeedDrop = (FeedDropStatement) stmt;
- String dataverseName = getActiveDataverseName(stmtFeedDrop.getDataverseName());
+ String dataverseName = getActiveDataverse(stmtFeedDrop.getDataverseName());
String feedName = stmtFeedDrop.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1896,27 +2030,25 @@ public class AqlTranslator extends AbstractAqlTranslator {
}
}
- List<FeedActivity> feedActivities;
- try {
- feedActivities = MetadataManager.INSTANCE.getConnectFeedActivitiesForFeed(mdTxnCtx, dataverseName,
- feedName);
+ FeedId feedId = new FeedId(dataverseName, feedName);
+ List<FeedConnectionId> activeConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(feedId);
+ if (activeConnections != null && !activeConnections.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+ for (FeedConnectionId connectionId : activeConnections) {
+ builder.append(connectionId.getDatasetName() + "\n");
+ }
+
+ throw new AlgebricksException("Feed " + feedId
+ + " is currently active and connected to the following dataset(s) \n" + builder.toString());
+ } else {
MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- throw new MetadataException(e);
}
- List<JobSpecification> jobSpecs = new ArrayList<JobSpecification>();
- for (FeedActivity feedActivity : feedActivities) {
- JobSpecification jobSpec = FeedOperations.buildDisconnectFeedJobSpec(dataverseName, feedName,
- feedActivity.getDatasetName(), metadataProvider, feedActivity);
- jobSpecs.add(jobSpec);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Removed feed " + feedId);
}
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- for (JobSpecification spec : jobSpecs) {
- runJob(hcc, spec, true);
- }
} catch (Exception e) {
abort(e, e, mdTxnCtx);
@@ -1925,78 +2057,106 @@ public class AqlTranslator extends AbstractAqlTranslator {
MetadataLockManager.INSTANCE.dropFeedEnd(dataverseName, dataverseName + "." + feedName);
}
}
+
+ private void handleDropFeedPolicyStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws Exception {
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ FeedPolicyDropStatement stmtFeedPolicyDrop = (FeedPolicyDropStatement) stmt;
+ String dataverseName = getActiveDataverse(stmtFeedPolicyDrop.getDataverseName());
+ String policyName = stmtFeedPolicyDrop.getPolicyName().getValue();
+ MetadataLockManager.INSTANCE.dropFeedPolicyBegin(dataverseName, dataverseName + "." + policyName);
+
+ try {
+ FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName);
+ if (feedPolicy == null) {
+ if (!stmtFeedPolicyDrop.getIfExists()) {
+ throw new AlgebricksException("Unknown policy " + policyName + " in dataverse " + dataverseName);
+ }
+ }
+ MetadataManager.INSTANCE.dropFeedPolicy(mdTxnCtx, dataverseName, policyName);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ abort(e, e, mdTxnCtx);
+ throw e;
+ } finally {
+ MetadataLockManager.INSTANCE.dropFeedPolicyEnd(dataverseName, dataverseName + "." + policyName);
+ }
+ }
+
private void handleConnectFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
ConnectFeedStatement cfs = (ConnectFeedStatement) stmt;
- String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+ String dataverseName = getActiveDataverse(cfs.getDataverseName());
String feedName = cfs.getFeedName();
String datasetName = cfs.getDatasetName().getValue();
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ boolean readLatchAcquired = true;
+ boolean subscriberRegistered = false;
+ IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
+ FeedConnectionId feedConnId = null;
+
MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName, dataverseName
+ "." + feedName);
- boolean readLatchAcquired = true;
try {
-
metadataProvider.setWriteTransaction(true);
CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(), cfs
.getDatasetName().getValue(), cfs.getPolicy(), cfs.getQuery(), cfs.getVarCounter());
- Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
- dataverseName, cfs.getDatasetName().getValue());
- if (dataset == null) {
- throw new AsterixException("Unknown target dataset :" + cfs.getDatasetName().getValue());
- }
+ FeedUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(),
+ metadataProvider.getMetadataTxnContext());
- if (!dataset.getDatasetType().equals(DatasetType.INTERNAL)) {
- throw new AsterixException("Statement not applicable. Dataset " + cfs.getDatasetName().getValue()
- + " is not of required type " + DatasetType.INTERNAL);
- }
+ Feed feed = FeedUtil.validateIfFeedExists(dataverseName, cfs.getFeedName(),
+ metadataProvider.getMetadataTxnContext());
- Feed feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName,
- cfs.getFeedName());
- if (feed == null) {
- throw new AsterixException("Unknown source feed: " + cfs.getFeedName());
- }
+ feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName().getValue());
- FeedConnectionId feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName()
- .getValue());
- FeedActivity recentActivity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(mdTxnCtx,
- feedConnId, null);
- boolean isFeedActive = FeedUtil.isFeedActive(recentActivity);
- if (isFeedActive && !cfs.forceConnect()) {
- throw new AsterixException("Feed " + cfs.getDatasetName().getValue()
- + " is currently ACTIVE. Operation not supported");
+ if (FeedLifecycleListener.INSTANCE.isFeedConnectionActive(feedConnId)) {
+ throw new AsterixException("Feed " + cfs.getFeedName() + " is already connected to dataset "
+ + cfs.getDatasetName().getValue());
}
- FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName,
- cbfs.getPolicyName());
- if (feedPolicy == null) {
- feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx,
- MetadataConstants.METADATA_DATAVERSE_NAME, cbfs.getPolicyName());
- if (feedPolicy == null) {
- throw new AsterixException("Unknown feed policy" + cbfs.getPolicyName());
- }
- }
+ FeedPolicy feedPolicy = FeedUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(), mdTxnCtx);
- cfs.initialize(metadataProvider.getMetadataTxnContext(), dataset, feed);
- cbfs.setQuery(cfs.getQuery());
- metadataProvider.getConfig().put(FunctionUtils.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
- metadataProvider.getConfig().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, cbfs.getPolicyName());
- JobSpecification compiled = rewriteCompileQuery(metadataProvider, cfs.getQuery(), cbfs);
- JobSpecification newJobSpec = FeedUtil.alterJobSpecificationForFeed(compiled, feedConnId, feedPolicy);
+ // All Metadata checks have passed. Feed connect request is valid. //
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Altered feed ingestion spec to wrap operators");
- }
+ FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties());
+ Pair<FeedConnectionRequest, Boolean> p = getFeedConnectionRequest(dataverseName, feed,
+ cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
+ FeedConnectionRequest connectionRequest = p.first;
+ boolean createFeedIntakeJob = p.second;
+ FeedLifecycleListener.INSTANCE.registerFeedEventSubscriber(feedConnId, eventSubscriber);
+ subscriberRegistered = true;
+ if (createFeedIntakeJob) {
+ FeedId feedId = connectionRequest.getFeedJointKey().getFeedId();
+ PrimaryFeed primaryFeed = (PrimaryFeed) MetadataManager.INSTANCE.getFeed(mdTxnCtx,
+ feedId.getDataverse(), feedId.getFeedName());
+ Pair<JobSpecification, IFeedAdapterFactory> pair = FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
+ metadataProvider, policyAccessor);
+ runJob(hcc, pair.first, false);
+ IFeedAdapterFactory adapterFactory = pair.second;
+ if (adapterFactory.isRecordTrackingEnabled()) {
+ FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
+ adapterFactory.createIntakeProgressTracker());
+ }
+ eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+ }
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
+ readLatchAcquired = false;
+ eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
+ if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) {
+ eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); // blocking call
+ }
String waitForCompletionParam = metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION);
boolean waitForCompletion = waitForCompletionParam == null ? false : Boolean
.valueOf(waitForCompletionParam);
@@ -2005,7 +2165,6 @@ public class AqlTranslator extends AbstractAqlTranslator {
dataverseName + "." + feedName);
readLatchAcquired = false;
}
- runJob(hcc, newJobSpec, waitForCompletion);
} catch (Exception e) {
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
@@ -2016,18 +2175,127 @@ public class AqlTranslator extends AbstractAqlTranslator {
MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
dataverseName + "." + feedName);
}
+ if (subscriberRegistered) {
+ FeedLifecycleListener.INSTANCE.deregisterFeedEventSubscriber(feedConnId, eventSubscriber);
+ }
+ }
+ }
+
+ /**
+ * Generates a subscription request corresponding to a connect feed request. In addition, provides a boolean
+ * flag indicating if feed intake job needs to be started (source primary feed not found to be active).
+ *
+ * @param dataverse
+ * @param feed
+ * @param dataset
+ * @param feedPolicy
+ * @param mdTxnCtx
+ * @return
+ * @throws MetadataException
+ */
+ private Pair<FeedConnectionRequest, Boolean> getFeedConnectionRequest(String dataverse, Feed feed, String dataset,
+ FeedPolicy feedPolicy, MetadataTransactionContext mdTxnCtx) throws MetadataException {
+ IFeedJoint sourceFeedJoint = null;
+ FeedConnectionRequest request = null;
+ List<String> functionsToApply = new ArrayList<String>();
+ boolean needIntakeJob = false;
+ List<IFeedJoint> jointsToRegister = new ArrayList<IFeedJoint>();
+ FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), dataset);
+
+ ConnectionLocation connectionLocation = null;
+ FeedJointKey feedJointKey = getFeedJointKey(feed, mdTxnCtx);
+ boolean isFeedJointAvailable = FeedLifecycleListener.INSTANCE.isFeedJointAvailable(feedJointKey);
+ if (!isFeedJointAvailable) {
+ sourceFeedJoint = FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
+ if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable.
+ connectionLocation = ConnectionLocation.SOURCE_FEED_INTAKE_STAGE;
+ FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId
+ Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getFeedName());
+ FeedJointKey intakeFeedJointKey = new FeedJointKey(sourceFeedId, new ArrayList<String>());
+ sourceFeedJoint = new FeedJoint(intakeFeedJointKey, primaryFeed.getFeedId(), connectionLocation,
+ FeedJointType.INTAKE, connectionId);
+ jointsToRegister.add(sourceFeedJoint);
+ needIntakeJob = true;
+ } else {
+ connectionLocation = sourceFeedJoint.getConnectionLocation();
+ }
+
+ String[] functions = feedJointKey.getStringRep()
+ .substring(sourceFeedJoint.getFeedJointKey().getStringRep().length()).trim().split(":");
+ for (String f : functions) {
+ if (f.trim().length() > 0) {
+ functionsToApply.add(f);
+ }
+ }
+ // register the compute feed point that represents the final output from the collection of
+ // functions that will be applied.
+ if (!functionsToApply.isEmpty()) {
+ FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply);
+ IFeedJoint computeFeedJoint = new FeedJoint(computeFeedJointKey, feed.getFeedId(),
+ ConnectionLocation.SOURCE_FEED_COMPUTE_STAGE, FeedJointType.COMPUTE, connectionId);
+ jointsToRegister.add(computeFeedJoint);
+ }
+ for (IFeedJoint joint : jointsToRegister) {
+ FeedLifecycleListener.INSTANCE.registerFeedJoint(joint);
+ }
+ } else {
+ sourceFeedJoint = FeedLifecycleListener.INSTANCE.getFeedJoint(feedJointKey);
+ connectionLocation = sourceFeedJoint.getConnectionLocation();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Feed joint " + sourceFeedJoint + " is available! need not apply any further computation");
+ }
}
+
+ request = new FeedConnectionRequest(sourceFeedJoint.getFeedJointKey(), connectionLocation, functionsToApply,
+ dataset, feedPolicy.getPolicyName(), feedPolicy.getProperties(), feed.getFeedId());
+
+ sourceFeedJoint.addConnectionRequest(request);
+ return new Pair<FeedConnectionRequest, Boolean>(request, needIntakeJob);
}
+
+ /*
+ * Gets the feed joint corresponding to the feed definition. Tuples constituting the feed are
+ * available at this feed joint.
+ */
+ private FeedJointKey getFeedJointKey(Feed feed, MetadataTransactionContext ctx) throws MetadataException {
+ Feed sourceFeed = feed;
+ List<String> appliedFunctions = new ArrayList<String>();
+ while (sourceFeed.getFeedType().equals(FeedType.SECONDARY)) {
+ if (sourceFeed.getAppliedFunction() != null) {
+ appliedFunctions.add(0, sourceFeed.getAppliedFunction().getName());
+ }
+ Feed parentFeed = MetadataManager.INSTANCE.getFeed(ctx, feed.getDataverseName(),
+ ((SecondaryFeed) sourceFeed).getSourceFeedName());
+ sourceFeed = parentFeed;
+ }
+
+ if (sourceFeed.getAppliedFunction() != null) {
+ appliedFunctions.add(0, sourceFeed.getAppliedFunction().getName());
+ }
+ return new FeedJointKey(sourceFeed.getFeedId(), appliedFunctions);
+ }
+
private void handleDisconnectFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
- String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+ String dataverseName = getActiveDataverse(cfs.getDataverseName());
String datasetName = cfs.getDatasetName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ FeedUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx);
+ Feed feed = FeedUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
+
+ FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue());
+ boolean isFeedConnectionActive = FeedLifecycleListener.INSTANCE.isFeedConnectionActive(connectionId);
+ if (!isFeedConnectionActive) {
+ throw new AsterixException("Feed " + feed.getFeedId().getFeedName() + " is currently not connected to "
+ + cfs.getDatasetName().getValue() + ". Invalid operation!");
+ }
+
MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName,
dataverseName + "." + cfs.getFeedName());
try {
@@ -2037,47 +2305,87 @@ public class AqlTranslator extends AbstractAqlTranslator {
throw new AsterixException("Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse "
+ dataverseName);
}
- if (!dataset.getDatasetType().equals(DatasetType.INTERNAL)) {
- throw new AsterixException("Statement not applicable. Dataset " + cfs.getDatasetName().getValue()
- + " is not of required type " + DatasetType.INTERNAL);
+
+ Pair<JobSpecification, Boolean> specDisconnectType = FeedOperations.buildDisconnectFeedJobSpec(
+ metadataProvider, connectionId);
+ JobSpecification jobSpec = specDisconnectType.first;
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ runJob(hcc, jobSpec, true);
+
+ if (!specDisconnectType.second) {
+ CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(connectionId);
+ FeedLifecycleListener.INSTANCE.reportPartialDisconnection(connectionId);
}
- Feed feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, cfs
- .getFeedName().getValue());
- if (feed == null) {
- throw new AsterixException("Unknown source feed :" + cfs.getFeedName());
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ abort(e, e, mdTxnCtx);
}
+ throw e;
+ } finally {
+ MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+ dataverseName + "." + cfs.getFeedName());
+ }
+ }
+
+ private void handleSubscribeFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws Exception {
- FeedActivity feedActivity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(mdTxnCtx,
- new FeedConnectionId(dataverseName, feed.getFeedName(), datasetName), null);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Subscriber Feed Statement :" + stmt);
+ }
- boolean isFeedActive = FeedUtil.isFeedActive(feedActivity);
- if (!isFeedActive) {
- throw new AsterixException("Feed " + cfs.getDatasetName().getValue()
- + " is currently INACTIVE. Operation not supported");
- }
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ metadataProvider.setWriteTransaction(true);
+ SubscribeFeedStatement bfs = (SubscribeFeedStatement) stmt;
+ bfs.initialize(metadataProvider.getMetadataTxnContext());
+
+ CompiledSubscribeFeedStatement csfs = new CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(),
+ bfs.getQuery(), bfs.getVarCounter());
+ metadataProvider.getConfig().put(FunctionUtils.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
+ metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + bfs.getPolicy());
+ metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
+ StringUtils.join(bfs.getLocations(), ','));
+
+ JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), csfs);
+ FeedConnectionId feedConnectionId = new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), bfs
+ .getSubscriptionRequest().getTargetDataset());
+ String dataverse = feedConnectionId.getFeedId().getDataverse();
+ String dataset = feedConnectionId.getDatasetName();
+ MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset, dataverse + "."
+ + feedConnectionId.getFeedId().getFeedName());
- JobSpecification jobSpec = FeedOperations.buildDisconnectFeedJobSpec(dataverseName, cfs.getFeedName()
- .getValue(), cfs.getDatasetName().getValue(), metadataProvider, feedActivity);
+ try {
+
+ JobSpecification alteredJobSpec = FeedUtil.alterJobSpecificationForFeed(compiled, feedConnectionId, bfs
+ .getSubscriptionRequest().getPolicyParameters());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- runJob(hcc, jobSpec, true);
+
+ if (compiled != null) {
+ runJob(hcc, alteredJobSpec, false);
+ }
+
} catch (Exception e) {
+ e.printStackTrace();
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
}
throw e;
} finally {
- MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName,
- dataverseName + "." + cfs.getFeedName());
+ MetadataLockManager.INSTANCE.subscribeFeedEnd(dataverse, dataverse + "." + dataset, dataverse + "."
+ + feedConnectionId.getFeedId().getFeedName());
}
}
private void handleCompactStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
CompactStatement compactStatement = (CompactStatement) stmt;
- String dataverseName = getActiveDataverseName(compactStatement.getDataverseName());
+ String dataverseName = getActiveDataverse(compactStatement.getDataverseName());
String datasetName = compactStatement.getDatasetName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
@@ -2261,7 +2569,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
private void handleExternalDatasetRefreshStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
RefreshExternalDatasetStatement stmtRefresh = (RefreshExternalDatasetStatement) stmt;
- String dataverseName = getActiveDataverseName(stmtRefresh.getDataverseName());
+ String dataverseName = getActiveDataverse(stmtRefresh.getDataverseName());
String datasetName = stmtRefresh.getDatasetName().getValue();
ExternalDatasetTransactionState transactionState = ExternalDatasetTransactionState.COMMIT;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -2503,8 +2811,8 @@ public class AqlTranslator extends AbstractAqlTranslator {
RunStatement pregelixStmt = (RunStatement) stmt;
boolean bActiveTxn = true;
- String dataverseNameFrom = getActiveDataverseName(pregelixStmt.getDataverseNameFrom());
- String dataverseNameTo = getActiveDataverseName(pregelixStmt.getDataverseNameTo());
+ String dataverseNameFrom = getActiveDataverse(pregelixStmt.getDataverseNameFrom());
+ String dataverseNameTo = getActiveDataverse(pregelixStmt.getDataverseNameTo());
String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue();
String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue();
@@ -2745,7 +3053,7 @@ public class AqlTranslator extends AbstractAqlTranslator {
throw new AlgebricksException("dataverse not specified");
}
- private String getActiveDataverseName(Identifier dataverse) throws AlgebricksException {
+ private String getActiveDataverse(Identifier dataverse) throws AlgebricksException {
return getActiveDataverseName(dataverse != null ? dataverse.getValue() : null);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/CentralFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/CentralFeedManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/CentralFeedManager.java
new file mode 100644
index 0000000..359da16
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/CentralFeedManager.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.util.List;
+
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.parser.AQLParser;
+import edu.uci.ics.asterix.aql.translator.AqlTranslator;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.ICentralFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLoadManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.metadata.feeds.SocketMessageListener;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class CentralFeedManager implements ICentralFeedManager {
+
+ private static final ICentralFeedManager centralFeedManager = new CentralFeedManager();
+
+ public static ICentralFeedManager getInstance() {
+ return centralFeedManager;
+ }
+
+ private final int port;
+ private final IFeedLoadManager feedLoadManager;
+ private final IFeedTrackingManager feedTrackingManager;
+ private final SocketMessageListener messageListener;
+
+ private CentralFeedManager() {
+ this.port = AsterixAppContextInfo.getInstance().getFeedProperties().getFeedCentralManagerPort();
+ this.feedLoadManager = new FeedLoadManager();
+ this.feedTrackingManager = new FeedTrackingManager();
+ this.messageListener = new SocketMessageListener(port, new FeedMessageReceiver(this));
+ }
+
+ @Override
+ public void start() throws AsterixException {
+ messageListener.start();
+ }
+
+ @Override
+ public void stop() throws AsterixException, IOException {
+ messageListener.stop();
+ }
+
+ public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
+ IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+ JobId jobId = hcc.startJob(spec);
+ if (waitForCompletion) {
+ hcc.waitForCompletion(jobId);
+ }
+ return jobId;
+ }
+
+ @Override
+ public IFeedLoadManager getFeedLoadManager() {
+ return feedLoadManager;
+ }
+
+ @Override
+ public IFeedTrackingManager getFeedTrackingManager() {
+ return feedTrackingManager;
+ }
+
+ public static class AQLExecutor {
+
+ private static final PrintWriter out = new PrintWriter(System.out, true);
+
+ public static void executeAQL(String aql) throws Exception {
+ AQLParser parser = new AQLParser(new StringReader(aql));
+ List<Statement> statements;
+ statements = parser.Statement();
+ SessionConfig pc = new SessionConfig(out, OutputFormat.ADM);
+ AqlTranslator translator = new AqlTranslator(statements, pc);
+ translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, AqlTranslator.ResultDelivery.SYNC);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedCollectInfo.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedCollectInfo.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedCollectInfo.java
new file mode 100644
index 0000000..db3ce77
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedCollectInfo.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.asterix.feeds;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedCollectInfo extends FeedInfo {
+ public FeedId sourceFeedId;
+ public FeedConnectionId feedConnectionId;
+ public List<String> collectLocations = new ArrayList<String>();
+ public List<String> computeLocations = new ArrayList<String>();
+ public List<String> storageLocations = new ArrayList<String>();
+ public Map<String, String> feedPolicy;
+ public String superFeedManagerHost;
+ public int superFeedManagerPort;
+ public boolean fullyConnected;
+
+ public FeedCollectInfo(FeedId sourceFeedId, FeedConnectionId feedConnectionId, JobSpecification jobSpec,
+ JobId jobId, Map<String, String> feedPolicy) {
+ super(jobSpec, jobId, FeedInfoType.COLLECT);
+ this.sourceFeedId = sourceFeedId;
+ this.feedConnectionId = feedConnectionId;
+ this.feedPolicy = feedPolicy;
+ this.fullyConnected = true;
+ }
+
+ @Override
+ public String toString() {
+ return FeedInfoType.COLLECT + "[" + feedConnectionId + "]";
+ }
+}