You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ti...@apache.org on 2017/01/26 15:29:32 UTC
[5/5] asterixdb git commit: Replace Servlets with Netty Based HTTP
Servers
Replace Servlets with Netty Based HTTP Servers
Change-Id: I3d552d4eb8c868535ca4c41cbcf7e352217b18ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1429
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
BAD: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/60e7f12b
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/60e7f12b
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/60e7f12b
Branch: refs/heads/master
Commit: 60e7f12b47fa7e8e31d817ce2d46999908efb9d4
Parents: 9d30640
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Wed Jan 25 19:16:46 2017 -0800
Committer: Till Westmann <ti...@apache.org>
Committed: Thu Jan 26 07:28:19 2017 -0800
----------------------------------------------------------------------
asterixdb/asterix-app/pom.xml | 71 +--
.../asterix/api/http/server/ApiServlet.java | 237 ++++++++
.../api/http/server/ClusterApiServlet.java | 190 ++++++
.../ClusterControllerDetailsApiServlet.java | 107 ++++
.../api/http/server/ConnectorApiServlet.java | 194 +++++++
.../asterix/api/http/server/DdlApiServlet.java | 52 ++
.../api/http/server/DiagnosticsApiServlet.java | 125 ++++
.../asterix/api/http/server/FeedServlet.java | 98 ++++
.../asterix/api/http/server/FullApiServlet.java | 53 ++
.../server/NodeControllerDetailsApiServlet.java | 214 +++++++
.../api/http/server/QueryApiServlet.java | 50 ++
.../api/http/server/QueryResultApiServlet.java | 133 +++++
.../api/http/server/QueryServiceServlet.java | 557 ++++++++++++++++++
.../api/http/server/QueryStatusApiServlet.java | 118 ++++
.../http/server/QueryWebInterfaceServlet.java | 133 +++++
.../asterix/api/http/server/RestApiServlet.java | 248 ++++++++
.../api/http/server/ShutdownApiServlet.java | 107 ++++
.../api/http/server/UpdateApiServlet.java | 50 ++
.../api/http/server/VersionApiServlet.java | 75 +++
.../asterix/api/http/servlet/APIServlet.java | 205 -------
.../api/http/servlet/ClusterAPIServlet.java | 178 ------
.../servlet/ClusterCCDetailsAPIServlet.java | 101 ----
.../servlet/ClusterNodeDetailsAPIServlet.java | 215 -------
.../api/http/servlet/ConnectorAPIServlet.java | 181 ------
.../asterix/api/http/servlet/DDLAPIServlet.java | 51 --
.../api/http/servlet/DiagnosticsAPIServlet.java | 118 ----
.../asterix/api/http/servlet/FeedServlet.java | 85 ---
.../api/http/servlet/FullAPIServlet.java | 53 --
.../api/http/servlet/QueryAPIServlet.java | 50 --
.../api/http/servlet/QueryResultAPIServlet.java | 114 ----
.../api/http/servlet/QueryServiceServlet.java | 571 -------------------
.../api/http/servlet/QueryStatusAPIServlet.java | 101 ----
.../http/servlet/QueryWebInterfaceServlet.java | 132 -----
.../api/http/servlet/RESTAPIServlet.java | 252 --------
.../api/http/servlet/ShutdownAPIServlet.java | 92 ---
.../api/http/servlet/UpdateAPIServlet.java | 50 --
.../api/http/servlet/VersionAPIServlet.java | 55 --
.../app/external/FeedWorkCollection.java | 2 +-
.../apache/asterix/app/result/ResultUtil.java | 7 +-
.../asterix/app/translator/QueryTranslator.java | 4 +-
.../asterix/drivers/AsterixWebServer.java | 41 --
.../bootstrap/CCApplicationEntryPoint.java | 234 ++++----
.../asterix/hyracks/bootstrap/WebManager.java | 69 +++
.../http/servlet/ConnectorAPIServletTest.java | 177 ------
.../api/http/servlet/ConnectorApiLetTest.java | 178 ++++++
.../api/http/servlet/QueryServiceLetTest.java | 46 ++
.../http/servlet/QueryServiceServletTest.java | 45 --
.../api/http/servlet/VersionAPIServletTest.java | 120 ----
.../api/http/servlet/VersionApiLetTest.java | 120 ++++
.../apache/asterix/common/utils/LetUtil.java | 58 ++
.../asterix/common/utils/ServletUtil.java | 58 --
.../apache/asterix/test/aql/TestExecutor.java | 36 +-
.../server/test/SampleLocalClusterIT.java | 4 +-
asterixdb/pom.xml | 5 +
hyracks-fullstack/hyracks/hyracks-http/pom.xml | 34 ++
.../hyracks/http/server/AbstractServlet.java | 69 +++
.../http/server/ChunkedNettyOutputStream.java | 108 ++++
.../hyracks/http/server/ChunkedResponse.java | 113 ++++
.../hyracks/http/server/FullResponse.java | 96 ++++
.../apache/hyracks/http/server/GetRequest.java | 50 ++
.../apache/hyracks/http/server/HttpServer.java | 225 ++++++++
.../hyracks/http/server/HttpServerHandler.java | 131 +++++
.../http/server/HttpServerInitializer.java | 47 ++
.../apache/hyracks/http/server/IServlet.java | 92 +++
.../hyracks/http/server/IServletRequest.java | 63 ++
.../hyracks/http/server/IServletResponse.java | 77 +++
.../apache/hyracks/http/server/PostRequest.java | 59 ++
hyracks-fullstack/hyracks/pom.xml | 1 +
68 files changed, 4540 insertions(+), 3245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index f4667d0..77ca6ef 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -32,12 +32,10 @@
<comments>A business-friendly OSS license</comments>
</license>
</licenses>
-
<properties>
<appendedResourcesDirectory>${basedir}/src/main/appended-resources</appendedResourcesDirectory>
<sonar.sources>pom.xml,src/main/java,src/main/resources</sonar.sources>
</properties>
-
<build>
<plugins>
<plugin>
@@ -62,7 +60,6 @@
</goals>
</execution>
</executions>
-
<configuration>
<!--
If you'd like to tell the plugin where your .git directory is,
@@ -80,7 +77,6 @@
at source assembly time) is used in this case -->
<failOnNoGitDirectory>false</failOnNoGitDirectory>
</configuration>
-
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
@@ -165,33 +161,33 @@
</plugin>
</plugins>
<pluginManagement>
- <plugins>
+ <plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-test-datagenerator-maven-plugin</artifactId>
- <versionRange>[0.8.9-SNAPSHOT,)</versionRange>
- <goals>
- <goal>generate-testdata</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore />
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-test-datagenerator-maven-plugin</artifactId>
+ <versionRange>[0.8.9-SNAPSHOT,)</versionRange>
+ <goals>
+ <goal>generate-testdata</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore />
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
</pluginManagement>
</build>
<dependencies>
@@ -201,10 +197,6 @@
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-server</artifactId>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
<dependency>
@@ -228,6 +220,10 @@
<artifactId>hyracks-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-http</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-algebra</artifactId>
<version>${project.version}</version>
@@ -484,5 +480,10 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.1.6.Final</version>
+ </dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
new file mode 100644
index 0000000..c38e0a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+
+import java.awt.image.BufferedImage;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.imageio.ImageIO;
+
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.app.result.ResultUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.aql.parser.TokenMgrError;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.IServlet;
+import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.server.IServletResponse;
+
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class ApiServlet extends AbstractServlet {
+
+ private static final Logger LOGGER = Logger.getLogger(ApiServlet.class.getName());
+ public static final String HTML_STATEMENT_SEPARATOR = "<!-- BEGIN -->";
+
+ private final ILangCompilationProvider aqlCompilationProvider;
+ private final ILangCompilationProvider sqlppCompilationProvider;
+ private final IStatementExecutorFactory statementExectorFactory;
+
+ public ApiServlet(ConcurrentMap<String, Object> ctx, String[] paths,
+ ILangCompilationProvider aqlCompilationProvider, ILangCompilationProvider sqlppCompilationProvider,
+ IStatementExecutorFactory statementExecutorFactory) {
+ super(ctx, paths);
+ this.aqlCompilationProvider = aqlCompilationProvider;
+ this.sqlppCompilationProvider = sqlppCompilationProvider;
+ this.statementExectorFactory = statementExecutorFactory;
+ }
+
+ public void doPost(IServletRequest request, IServletResponse response) {
+ // Query language
+ ILangCompilationProvider compilationProvider = "AQL".equals(request.getParameter("query-language"))
+ ? aqlCompilationProvider : sqlppCompilationProvider;
+ IParserFactory parserFactory = compilationProvider.getParserFactory();
+
+ // Output format.
+ PrintWriter out = response.writer();
+ OutputFormat format;
+ boolean csvAndHeader = false;
+ String output = request.getParameter("output-format");
+ try {
+ format = OutputFormat.valueOf(output);
+ } catch (IllegalArgumentException e) {
+ LOGGER.log(Level.INFO,
+ output + ": unsupported output-format, using " + OutputFormat.CLEAN_JSON + " instead", e);
+ // Default output format
+ format = OutputFormat.CLEAN_JSON;
+ }
+
+ String query = request.getParameter("query");
+ String wrapperArray = request.getParameter("wrapper-array");
+ String printExprParam = request.getParameter("print-expr-tree");
+ String printRewrittenExprParam = request.getParameter("print-rewritten-expr-tree");
+ String printLogicalPlanParam = request.getParameter("print-logical-plan");
+ String printOptimizedLogicalPlanParam = request.getParameter("print-optimized-logical-plan");
+ String printJob = request.getParameter("print-job");
+ String executeQuery = request.getParameter("execute-query");
+ try {
+ IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Failure setting content type", e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
+ try {
+ IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
+ if (hds == null) {
+ synchronized (ctx) {
+ hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
+ if (hds == null) {
+ hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+ ctx.put(HYRACKS_DATASET_ATTR, hds);
+ }
+ }
+ }
+ IParser parser = parserFactory.createParser(query);
+ List<Statement> aqlStatements = parser.parse();
+ SessionConfig sessionConfig = new SessionConfig(out, format, true, isSet(executeQuery), true);
+ sessionConfig.set(SessionConfig.FORMAT_HTML, true);
+ sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, csvAndHeader);
+ sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, isSet(wrapperArray));
+ sessionConfig.setOOBData(isSet(printExprParam), isSet(printRewrittenExprParam),
+ isSet(printLogicalPlanParam), isSet(printOptimizedLogicalPlanParam), isSet(printJob));
+ MetadataManager.INSTANCE.init();
+ IStatementExecutor translator =
+ statementExectorFactory.create(aqlStatements, sessionConfig, compilationProvider);
+ double duration;
+ long startTime = System.currentTimeMillis();
+ translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.IMMEDIATE);
+ long endTime = System.currentTimeMillis();
+ duration = (endTime - startTime) / 1000.00;
+ out.println(HTML_STATEMENT_SEPARATOR);
+ out.println("<PRE>Duration of all jobs: " + duration + " sec</PRE>");
+ } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, pe.toString(), pe);
+ ResultUtil.webUIParseExceptionHandler(out, pe, query);
+ } catch (Exception e) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ ResultUtil.webUIErrorHandler(out, e);
+ }
+ }
+
+ public void doGet(IServletRequest request, IServletResponse response) {
+ String resourcePath = null;
+ String requestURI = request.getHttpRequest().uri();
+
+ if ("/".equals(requestURI)) {
+ try {
+ IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Failure setting content type", e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
+ resourcePath = "/webui/querytemplate.html";
+ } else {
+ resourcePath = requestURI;
+ }
+
+ try (InputStream is = ApiServlet.class.getResourceAsStream(resourcePath)) {
+ if (is == null) {
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ return;
+ }
+ // Special handler for font files and .png resources
+ if (resourcePath.endsWith(".png")) {
+ BufferedImage img = ImageIO.read(is);
+ IServletResponse.setContentType(response, IServlet.ContentType.IMG_PNG);
+ OutputStream outputStream = response.outputStream();
+ String formatName = "png";
+ ImageIO.write(img, formatName, outputStream);
+ outputStream.close();
+ return;
+ }
+ String type = IServlet.ContentType.mime(QueryWebInterfaceServlet.extension(resourcePath));
+ IServletResponse.setContentType(response, "".equals(type) ? IServlet.ContentType.TEXT_PLAIN : type,
+ IServlet.Encoding.UTF8);
+ writeOutput(response, is, resourcePath);
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Failure handling request", e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
+ }
+
+ private void writeOutput(IServletResponse response, InputStream is, String resourcePath) throws IOException {
+ try (InputStreamReader isr = new InputStreamReader(is); BufferedReader br = new BufferedReader(isr)) {
+ StringBuilder sb = new StringBuilder();
+ String line;
+ try {
+ line = br.readLine();
+ } catch (NullPointerException e) {
+ LOGGER.log(Level.WARNING,
+ "NPE reading resource " + resourcePath + ", assuming JDK-8080094; returning 404", e);
+ // workaround lame JDK bug where a broken InputStream is returned in case the resourcePath is a
+ // directory; see https://bugs.openjdk.java.net/browse/JDK-8080094
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ return;
+ }
+ while (line != null) {
+ sb.append(line);
+ line = br.readLine();
+ }
+ PrintWriter out = response.writer();
+ out.println(sb.toString());
+ }
+ }
+
+ private static boolean isSet(String requestParameter) {
+ return requestParameter != null && "true".equals(requestParameter);
+ }
+
+ @Override
+ public void handle(IServletRequest request, IServletResponse response) {
+ response.setStatus(HttpResponseStatus.OK);
+ if (request.getHttpRequest().method() == HttpMethod.GET) {
+ doGet(request, response);
+ } else if (request.getHttpRequest().method() == HttpMethod.POST) {
+ doPost(request, response);
+ } else {
+ response.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
new file mode 100644
index 0000000..6fd6c47
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+
+import org.apache.asterix.common.config.AbstractProperties;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.utils.JSONUtil;
+import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.IServlet;
+import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.server.IServletResponse;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class ClusterApiServlet extends AbstractServlet {
+
+ private static final Logger LOGGER = Logger.getLogger(ClusterApiServlet.class.getName());
+ private static final Pattern PARENT_DIR = Pattern.compile("/[^./]+/\\.\\./");
+ private static final Pattern REPLICATION_PROPERTY = Pattern.compile("^replication\\.");
+ protected static final String NODE_ID_KEY = "node_id";
+ protected static final String CONFIG_URI_KEY = "configUri";
+ protected static final String STATS_URI_KEY = "statsUri";
+ protected static final String THREAD_DUMP_URI_KEY = "threadDumpUri";
+ protected static final String SHUTDOWN_URI_KEY = "shutdownUri";
+ protected static final String FULL_SHUTDOWN_URI_KEY = "fullShutdownUri";
+ protected static final String VERSION_URI_KEY = "versionUri";
+ protected static final String DIAGNOSTICS_URI_KEY = "diagnosticsUri";
+ protected static final String REPLICATION_URI_KEY = "replicationUri";
+ private final ObjectMapper om = new ObjectMapper();
+
+ public ClusterApiServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+ super(ctx, paths);
+ }
+
+ protected void getUnsafe(IServletRequest request, IServletResponse response) throws IOException {
+ IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
+ PrintWriter responseWriter = response.writer();
+ try {
+ ObjectNode json;
+ response.setStatus(HttpResponseStatus.OK);
+ switch (path(request)) {
+ case "":
+ json = getClusterStateJSON(request, "");
+ break;
+ case "/replication":
+ json = getReplicationJSON();
+ break;
+ case "/summary":
+ json = getClusterStateSummaryJSON();
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ responseWriter.write(JSONUtil.convertNode(json));
+ } catch (IllegalArgumentException e) { // NOSONAR - exception not logged or rethrown
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, "exception thrown for " + request, e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ responseWriter.write(e.toString());
+ }
+ responseWriter.flush();
+ }
+
+ protected ObjectNode getClusterStateSummaryJSON() {
+ return ClusterStateManager.INSTANCE.getClusterStateSummary();
+ }
+
+ protected ObjectNode getReplicationJSON() {
+ for (AbstractProperties props : getPropertiesInstances()) {
+ if (props instanceof ReplicationProperties) {
+ ObjectNode json = om.createObjectNode();
+ json.putPOJO("config", props.getProperties(key -> REPLICATION_PROPERTY.matcher(key).replaceFirst("")));
+ return json;
+ }
+ }
+ throw new IllegalStateException("ERROR: replication properties not found");
+ }
+
+ protected Map<String, Object> getAllClusterProperties() {
+ Map<String, Object> allProperties = new HashMap<>();
+ for (AbstractProperties properties : getPropertiesInstances()) {
+ if (!(properties instanceof ReplicationProperties)) {
+ allProperties.putAll(properties.getProperties());
+ }
+ }
+ return allProperties;
+ }
+
+ protected List<AbstractProperties> getPropertiesInstances() {
+ return AbstractProperties.getImplementations();
+ }
+
+ protected ObjectNode getClusterStateJSON(IServletRequest request, String pathToNode) {
+ ObjectNode json = ClusterStateManager.INSTANCE.getClusterStateDescription();
+ Map<String, Object> allProperties = getAllClusterProperties();
+ json.putPOJO("config", allProperties);
+
+ ArrayNode ncs = (ArrayNode) json.get("ncs");
+ final StringBuilder requestURL = new StringBuilder("http://");
+ requestURL.append(request.getHeader(HttpHeaderNames.HOST));
+ requestURL.append(request.getHttpRequest().uri());
+ if (requestURL.charAt(requestURL.length() - 1) != '/') {
+ requestURL.append('/');
+ }
+ requestURL.append(pathToNode);
+ String clusterURL = canonicalize(requestURL);
+ String adminURL = canonicalize(clusterURL + "../");
+ String nodeURL = clusterURL + "node/";
+ for (int i = 0; i < ncs.size(); i++) {
+ ObjectNode nc = (ObjectNode) ncs.get(i);
+ nc.put(CONFIG_URI_KEY, nodeURL + nc.get(NODE_ID_KEY).asText() + "/config");
+ nc.put(STATS_URI_KEY, nodeURL + nc.get(NODE_ID_KEY).asText() + "/stats");
+ nc.put(THREAD_DUMP_URI_KEY, nodeURL + nc.get(NODE_ID_KEY).asText() + "/threaddump");
+ }
+ ObjectNode cc;
+ if (json.has("cc")) {
+ cc = (ObjectNode) json.get("cc");
+ } else {
+ cc = om.createObjectNode();
+ json.set("cc", cc);
+ }
+ cc.put(CONFIG_URI_KEY, clusterURL + "cc/config");
+ cc.put(STATS_URI_KEY, clusterURL + "cc/stats");
+ cc.put(THREAD_DUMP_URI_KEY, clusterURL + "cc/threaddump");
+ json.put(REPLICATION_URI_KEY, clusterURL + "replication");
+ json.put(SHUTDOWN_URI_KEY, adminURL + "shutdown");
+ json.put(FULL_SHUTDOWN_URI_KEY, adminURL + "shutdown?all=true");
+ json.put(VERSION_URI_KEY, adminURL + "version");
+ json.put(DIAGNOSTICS_URI_KEY, adminURL + "diagnostics");
+ return json;
+ }
+
+ private String canonicalize(CharSequence requestURL) {
+ String clusterURL = "";
+ String newClusterURL = requestURL.toString();
+ while (!clusterURL.equals(newClusterURL)) {
+ clusterURL = newClusterURL;
+ newClusterURL = PARENT_DIR.matcher(clusterURL).replaceAll("/");
+ }
+ return clusterURL;
+ }
+
+ @Override
+ public void handle(IServletRequest request, IServletResponse response) {
+ if (request.getHttpRequest().method() != HttpMethod.GET) {
+ response.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED);
+ return;
+ }
+ try {
+ getUnsafe(request, response);
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Unhandled IOException thrown from " + getClass().getName() + " get impl", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
new file mode 100644
index 0000000..4419e8a
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.http.server.IServlet;
+import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.server.IServletResponse;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class ClusterControllerDetailsApiServlet extends ClusterApiServlet {
+
+ private static final Logger LOGGER = Logger.getLogger(ClusterControllerDetailsApiServlet.class.getName());
+ private final ObjectMapper om = new ObjectMapper();
+
+ public ClusterControllerDetailsApiServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+ super(ctx, paths);
+ }
+
+ @Override
+ protected void getUnsafe(IServletRequest request, IServletResponse response) throws IOException {
+ PrintWriter responseWriter = response.writer();
+ IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ try {
+ ObjectNode json;
+ response.setStatus(HttpResponseStatus.OK);
+ if ("".equals(path(request))) {
+ json = (ObjectNode) getClusterStateJSON(request, "../").get("cc");
+ } else {
+ json = processNode(request, hcc);
+ }
+ IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
+ responseWriter.write(new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(json));
+ } catch (IllegalArgumentException e) { // NOSONAR - exception not logged or rethrown
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, "exception thrown for " + request, e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ responseWriter.write(e.toString());
+ }
+ responseWriter.flush();
+ }
+
+ private ObjectNode processNode(IServletRequest request, IHyracksClientConnection hcc)
+ throws Exception {
+ String pathInfo = path(request);
+ if (pathInfo.endsWith("/")) {
+ throw new IllegalArgumentException();
+ }
+ String[] parts = pathInfo.substring(1).split("/");
+
+ if ("".equals(pathInfo)) {
+ return (ObjectNode) getClusterStateJSON(request, "../../").get("cc");
+ } else if (parts.length == 1) {
+ switch (parts[0]) {
+ case "config":
+ return om.readValue(hcc.getNodeDetailsJSON(null, false, true), ObjectNode.class);
+ case "stats":
+ return om.readValue(hcc.getNodeDetailsJSON(null, true, false), ObjectNode.class);
+ case "threaddump":
+ return processCCThreadDump(hcc);
+
+ default:
+ throw new IllegalArgumentException();
+ }
+
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private ObjectNode processCCThreadDump(IHyracksClientConnection hcc) throws Exception {
+ String dump = hcc.getThreadDump(null);
+ if (dump == null) {
+ throw new IllegalArgumentException();
+ }
+ return (ObjectNode) om.readTree(dump);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
new file mode 100644
index 0000000..d832672
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.util.FlushDatasetUtils;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.IServlet;
+import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.server.IServletResponse;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * The REST API that takes a dataverse name and a dataset name as the input
+ * and returns an array of file splits (IP, file-path) of the dataset in LOSSLESS_JSON.
+ * It is mostly used by external runtime, e.g., Pregelix or IMRU to pull data
+ * in parallel from existing AsterixDB datasets.
+ */
+public class ConnectorApiServlet extends AbstractServlet {
+ private static final Logger LOGGER = Logger.getLogger(ConnectorApiServlet.class.getName());
+
+ public ConnectorApiServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+ super(ctx, paths);
+ }
+
+ @Override
+ public void handle(IServletRequest request, IServletResponse response) {
+ if (request.getHttpRequest().method() != HttpMethod.GET) {
+ response.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED);
+ return;
+ }
+ response.setStatus(HttpResponseStatus.OK);
+ try {
+ IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Failure setting content type", e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
+ PrintWriter out = response.writer();
+ try {
+ ObjectMapper om = new ObjectMapper();
+ ObjectNode jsonResponse = om.createObjectNode();
+ String dataverseName = request.getParameter("dataverseName");
+ String datasetName = request.getParameter("datasetName");
+ if (dataverseName == null || datasetName == null) {
+ jsonResponse.put("error", "Parameter dataverseName or datasetName is null,");
+ out.write(jsonResponse.toString());
+ out.flush();
+ return;
+ }
+
+ IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ // Metadata transaction begins.
+ MetadataManager.INSTANCE.init();
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+ // Retrieves file splits of the dataset.
+ MetadataProvider metadataProvider = new MetadataProvider(null);
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ jsonResponse.put("error",
+ "Dataset " + datasetName + " does not exist in " + "dataverse " + dataverseName);
+ out.write(jsonResponse.toString());
+ out.flush();
+ return;
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ FileSplit[] fileSplits = metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName,
+ datasetName, temp);
+ ARecordType recordType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
+ dataset.getItemTypeName());
+ List<List<String>> primaryKeys = DatasetUtils.getPartitioningKeys(dataset);
+ StringBuilder pkStrBuf = new StringBuilder();
+ for (List<String> keys : primaryKeys) {
+ for (String key : keys) {
+ pkStrBuf.append(key).append(",");
+ }
+ }
+ pkStrBuf.delete(pkStrBuf.length() - 1, pkStrBuf.length());
+
+ // Constructs the returned json object.
+ formResponseObject(jsonResponse, fileSplits, recordType, pkStrBuf.toString(), temp,
+ hcc.getNodeControllerInfos());
+
+ // Flush the cached contents of the dataset to file system.
+ FlushDatasetUtils.flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseName, datasetName, datasetName);
+
+ // Metadata transaction commits.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ // Writes file splits.
+ out.write(jsonResponse.toString());
+ out.flush();
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failure handling a request", e);
+ out.println(e.getMessage());
+ out.flush();
+ e.printStackTrace(out);
+ }
+ }
+
+ private void formResponseObject(ObjectNode jsonResponse, FileSplit[] fileSplits, ARecordType recordType,
+ String primaryKeys, boolean temp, Map<String, NodeControllerInfo> nodeMap) {
+ ObjectMapper om = new ObjectMapper();
+ ArrayNode partititons = om.createArrayNode();
+ // Whether the dataset is temp or not
+ jsonResponse.put("temp", temp);
+ // Adds a primary key.
+ jsonResponse.put("keys", primaryKeys);
+ // Adds record type.
+ jsonResponse.set("type", recordType.toJSON());
+ // Generates file partitions.
+ for (FileSplit split : fileSplits) {
+ String ipAddress = nodeMap.get(split.getNodeName()).getNetworkAddress().getAddress();
+ String path = split.getPath();
+ FilePartition partition = new FilePartition(ipAddress, path);
+ partititons.add(partition.toObjectNode());
+ }
+ // Generates the response object which contains the splits.
+ jsonResponse.set("splits", partititons);
+ }
+}
+
+class FilePartition {
+ private final String ipAddress;
+ private final String path;
+
+ public FilePartition(String ipAddress, String path) {
+ this.ipAddress = ipAddress;
+ this.path = path;
+ }
+
+ public String getIPAddress() {
+ return ipAddress;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public String toString() {
+ return ipAddress + ":" + path;
+ }
+
+ public ObjectNode toObjectNode() {
+ ObjectMapper om = new ObjectMapper();
+ ObjectNode partition = om.createObjectNode();
+ partition.put("ip", ipAddress);
+ partition.put("path", path);
+ return partition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java
new file mode 100644
index 0000000..bcc6914
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DdlApiServlet.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.hyracks.http.server.IServletRequest;
+
+public class DdlApiServlet extends RestApiServlet {
+ private static final byte ALLOWED_CATEGORIES =
+ Statement.Category.QUERY | Statement.Category.UPDATE | Statement.Category.DDL;
+
+ public DdlApiServlet(ConcurrentMap<String, Object> ctx, String[] paths,
+ ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory) {
+ super(ctx, paths, compilationProvider, statementExecutorFactory);
+ }
+
+ @Override
+ protected String getQueryParameter(IServletRequest request) {
+ return request.getParameter("ddl");
+ }
+
+ @Override
+ protected byte getAllowedCategories() {
+ return ALLOWED_CATEGORIES;
+ }
+
+ @Override
+ protected String getErrorMessage() {
+ return "Invalid statement: Non-DDL statement %s to the DDL API.";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
new file mode 100644
index 0000000..d91352d
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.api.http.servlet.ServletConstants;
+import org.apache.asterix.runtime.util.AppContextInfo;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.http.server.IServlet;
+import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.server.IServletResponse;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet {
+ private static final Logger LOGGER = Logger.getLogger(DiagnosticsApiServlet.class.getName());
+
+ public DiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+ super(ctx, paths);
+ }
+
+ @Override
+ protected void getUnsafe(IServletRequest request, IServletResponse response) throws IOException {
+ IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
+ PrintWriter responseWriter = response.writer();
+ ObjectNode json;
+ ObjectMapper om = new ObjectMapper();
+ response.setStatus(HttpResponseStatus.OK);
+ om.enable(SerializationFeature.INDENT_OUTPUT);
+ try {
+ if (!"".equals(path(request))) {
+ throw new IllegalArgumentException();
+ }
+ json = getClusterDiagnosticsJSON();
+ responseWriter.write(om.writerWithDefaultPrettyPrinter().writeValueAsString(json));
+ } catch (IllegalStateException e) { // NOSONAR - exception not logged or rethrown
+ response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
+ } catch (IllegalArgumentException e) { // NOSONAR - exception not logged or rethrown
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, "exception thrown for " + request, e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ responseWriter.write(e.toString());
+ }
+ responseWriter.flush();
+ }
+
+ private ObjectNode getClusterDiagnosticsJSON() throws Exception {
+ ObjectMapper om = new ObjectMapper();
+ IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ ExecutorService executor = (ExecutorService) ctx.get(ServletConstants.EXECUTOR_SERVICE);
+ Map<String, Future<ObjectNode>> ccFutureData = new HashMap<>();
+ ccFutureData.put("threaddump",
+ executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(null)))));
+ ccFutureData.put("config",
+ executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getNodeDetailsJSON(null, false, true)))));
+ ccFutureData.put("stats",
+ executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getNodeDetailsJSON(null, true, false)))));
+
+ Map<String, Map<String, Future<ObjectNode>>> ncDataMap = new HashMap<>();
+ for (String nc : AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames()) {
+ Map<String, Future<ObjectNode>> ncData = new HashMap<>();
+ ncData.put("threaddump", executor.submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getThreadDump(nc)))));
+ ncData.put("config", executor
+ .submit(() -> fixupKeys((ObjectNode) om.readTree(hcc.getNodeDetailsJSON(nc, false, true)))));
+ ncData.put("stats", executor.submit(() -> fixupKeys(processNodeStats(hcc, nc))));
+ ncDataMap.put(nc, ncData);
+ }
+ ObjectNode result = om.createObjectNode();
+ result.putPOJO("cc", resolveFutures(ccFutureData));
+ List<Map<String, ?>> ncList = new ArrayList<>();
+ for (Map.Entry<String, Map<String, Future<ObjectNode>>> entry : ncDataMap.entrySet()) {
+ final Map<String, Object> ncMap = resolveFutures(entry.getValue());
+ ncMap.put("node_id", entry.getKey());
+ ncList.add(ncMap);
+ }
+ result.putPOJO("ncs", ncList);
+ result.putPOJO("date", new Date());
+ return result;
+ }
+
+ private Map<String, Object> resolveFutures(Map<String, Future<ObjectNode>> futureMap)
+ throws ExecutionException, InterruptedException {
+ Map<String, Object> result = new HashMap<>();
+ for (Map.Entry<String, Future<ObjectNode>> entry : futureMap.entrySet()) {
+ result.put(entry.getKey(), entry.getValue().get());
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java
new file mode 100644
index 0000000..ac79088
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedServlet.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.awt.image.BufferedImage;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.imageio.ImageIO;
+
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.IServlet;
+import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.server.IServletResponse;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class FeedServlet extends AbstractServlet {
+ private static final Logger LOGGER = Logger.getLogger(FeedServlet.class.getName());
+
+ public FeedServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+ super(ctx, paths);
+ }
+
+ @Override
+ public void handle(IServletRequest request, IServletResponse response) {
+ try {
+ response.setStatus(HttpResponseStatus.OK);
+ String resourcePath;
+ String requestURI = request.getHttpRequest().uri();
+
+ if ("/".equals(requestURI)) {
+ IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML);
+ resourcePath = "/feed/home.html";
+ } else {
+ resourcePath = requestURI;
+ }
+
+ InputStream is = FeedServlet.class.getResourceAsStream(resourcePath);
+ if (is == null) {
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ return;
+ }
+
+ // Special handler for font files and .png resources
+ if (resourcePath.endsWith(".png")) {
+
+ BufferedImage img = ImageIO.read(is);
+ OutputStream outputStream = response.outputStream();
+ String formatName = "png";
+ IServletResponse.setContentType(response, IServlet.ContentType.IMG_PNG);
+ ImageIO.write(img, formatName, outputStream);
+ return;
+ }
+
+ IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
+ InputStreamReader isr = new InputStreamReader(is);
+ StringBuilder sb = new StringBuilder();
+ BufferedReader br = new BufferedReader(isr);
+ String line = br.readLine();
+
+ while (line != null) {
+ sb.append(line + "\n");
+ line = br.readLine();
+ }
+
+ PrintWriter out = response.writer();
+ out.println(sb.toString());
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Failure handling request", e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java
new file mode 100644
index 0000000..7788136
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FullApiServlet.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.hyracks.http.server.IServletRequest;
+
+public class FullApiServlet extends RestApiServlet {
+
+ private static final String AQL_STMT_PARAM_NAME = "aql";
+ private static final byte ALLOWED_CATEGORIES = Statement.Category.QUERY | Statement.Category.UPDATE
+ | Statement.Category.DDL | Statement.Category.PROCEDURE;
+
+ public FullApiServlet(ConcurrentMap<String, Object> ctx, String[] paths,
+ ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory) {
+ super(ctx, paths, compilationProvider, statementExecutorFactory);
+ }
+
+ @Override
+ protected byte getAllowedCategories() {
+ return ALLOWED_CATEGORIES;
+ }
+
+ @Override
+ protected String getErrorMessage() {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ protected String getQueryParameter(IServletRequest request) {
+ return request.getParameter(AQL_STMT_PARAM_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
new file mode 100644
index 0000000..c1423e7
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.runtime.util.ClusterStateManager;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.http.server.IServlet;
+import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.server.IServletResponse;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class NodeControllerDetailsApiServlet extends ClusterApiServlet {
+
+ private static final Logger LOGGER = Logger.getLogger(NodeControllerDetailsApiServlet.class.getName());
+ private final ObjectMapper om = new ObjectMapper();
+
+ public NodeControllerDetailsApiServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+ super(ctx, paths);
+ om.enable(SerializationFeature.INDENT_OUTPUT);
+ }
+
+ @Override
+ protected void getUnsafe(IServletRequest request, IServletResponse response) throws IOException {
+ PrintWriter responseWriter = response.writer();
+ IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ try {
+ ObjectNode json;
+ response.setStatus(HttpResponseStatus.OK);
+ if ("".equals(path(request))) {
+ json = om.createObjectNode();
+ json.set("ncs", getClusterStateJSON(request, "../").get("ncs"));
+ } else {
+ json = processNode(request, hcc);
+ }
+ IServletResponse.setContentType(response, IServlet.ContentType.APPLICATION_JSON, IServlet.Encoding.UTF8);
+ responseWriter.write(om.writerWithDefaultPrettyPrinter().writeValueAsString(json));
+ } catch (IllegalStateException e) { // NOSONAR - exception not logged or rethrown
+ response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
+ } catch (IllegalArgumentException e) { // NOSONAR - exception not logged or rethrown
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ } catch (Exception e) {
+ LOGGER.log(Level.INFO, "exception thrown for " + request, e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ responseWriter.write(e.toString());
+ }
+ responseWriter.flush();
+ }
+
+ private ObjectNode processNode(IServletRequest request, IHyracksClientConnection hcc)
+ throws Exception {
+ String pathInfo = path(request);
+ if (pathInfo.endsWith("/")) {
+ throw new IllegalArgumentException();
+ }
+ String[] parts = pathInfo.substring(1).split("/");
+ final String node = parts[0];
+
+ if (parts.length == 1) {
+ ArrayNode ncs = (ArrayNode) getClusterStateJSON(request, "../../").get("ncs");
+ for (int i = 0; i < ncs.size(); i++) {
+ if (node.equals(ncs.get(i).get("node_id").asText())) {
+ return (ObjectNode) ncs.get(i);
+ }
+ }
+ if ("cc".equals(node)) {
+ return om.createObjectNode();
+ }
+ throw new IllegalArgumentException();
+ } else if (parts.length == 2) {
+ ObjectNode json;
+
+ switch (parts[1]) {
+ case "config":
+ json = processNodeConfig(hcc, node);
+ break;
+
+ case "stats":
+ json = processNodeStats(hcc, node);
+ break;
+
+ case "threaddump":
+ return processNodeThreadDump(hcc, node);
+
+ default:
+ throw new IllegalArgumentException();
+ }
+ fixupKeys(json);
+
+ return json;
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ protected ObjectNode fixupKeys(ObjectNode json) {
+ // TODO (mblow): generate the keys with _ to begin with
+ List<String> keys = new ArrayList<>();
+ for (Iterator<String> iter = json.fieldNames(); iter.hasNext();) {
+ keys.add(iter.next());
+ }
+ for (String key : keys) {
+ String newKey = key.replace('-', '_');
+ if (!newKey.equals(key)) {
+ json.set(newKey, json.remove(key));
+ }
+ }
+ return json;
+ }
+
+ protected ObjectNode processNodeStats(IHyracksClientConnection hcc, String node) throws Exception {
+ final String details = hcc.getNodeDetailsJSON(node, true, false);
+ if (details == null) {
+ throw new IllegalArgumentException();
+ }
+ ObjectNode json = (ObjectNode) om.readTree(details);
+ int index = json.get("rrd-ptr").asInt() - 1;
+ json.remove("rrd-ptr");
+
+ List<String> keys = new ArrayList<>();
+ for (Iterator<String> iter = json.fieldNames(); iter.hasNext();) {
+ keys.add(iter.next());
+ }
+
+ final ArrayNode gcNames = (ArrayNode) json.get("gc-names");
+ final ArrayNode gcCollectionTimes = (ArrayNode) json.get("gc-collection-times");
+ final ArrayNode gcCollectionCounts = (ArrayNode) json.get("gc-collection-counts");
+
+ for (String key : keys) {
+ if (key.startsWith("gc-")) {
+ json.remove(key);
+ } else {
+ final JsonNode keyNode = json.get(key);
+ if (keyNode instanceof ArrayNode) {
+ final ArrayNode valueArray = (ArrayNode) keyNode;
+ // fixup an index of -1 to the final element in the array (i.e. RRD_SIZE)
+ if (index == -1) {
+ index = valueArray.size() - 1;
+ }
+ final JsonNode value = valueArray.get(index);
+ json.remove(key);
+ json.set(key.replaceAll("s$",""), value);
+ }
+ }
+ }
+ ArrayNode gcs = om.createArrayNode();
+
+ for (int i = 0; i < gcNames.size(); i++) {
+ ObjectNode gc = om.createObjectNode();
+ gc.set("name", gcNames.get(i));
+ gc.set("collection-time", ((ArrayNode) gcCollectionTimes.get(i)).get(index));
+ gc.set("collection-count", ((ArrayNode) gcCollectionCounts.get(i)).get(index));
+ gcs.add(gc);
+ }
+ json.set("gcs", gcs);
+
+ return json;
+ }
+
+ private ObjectNode processNodeConfig(IHyracksClientConnection hcc, String node) throws Exception {
+ String config = hcc.getNodeDetailsJSON(node, false, true);
+ if (config == null) {
+ throw new IllegalArgumentException();
+ }
+ return (ObjectNode) om.readTree(config);
+ }
+
+ private ObjectNode processNodeThreadDump(IHyracksClientConnection hcc, String node) throws Exception {
+ if ("cc".equals(node)) {
+ return om.createObjectNode();
+ }
+ String dump = hcc.getThreadDump(node);
+ if (dump == null) {
+ // check to see if this is a node that is simply down
+ throw ClusterStateManager.INSTANCE.getNodePartitions(node) != null
+ ? new IllegalStateException()
+ : new IllegalArgumentException();
+ }
+ return (ObjectNode) om.readTree(dump);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java
new file mode 100644
index 0000000..917d9a8
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryApiServlet.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.hyracks.http.server.IServletRequest;
+
+public class QueryApiServlet extends RestApiServlet {
+ private static final byte ALLOWED_CATEGORIES = Statement.Category.QUERY;
+
+ public QueryApiServlet(ConcurrentMap<String, Object> ctx, String[] paths,
+ ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory) {
+ super(ctx, paths, compilationProvider, statementExecutorFactory);
+ }
+
+ @Override
+ protected String getQueryParameter(IServletRequest request) {
+ return request.getParameter("query");
+ }
+
+ @Override
+ protected byte getAllowedCategories() {
+ return ALLOWED_CATEGORIES;
+ }
+
+ @Override
+ protected String getErrorMessage() {
+ return "Invalid statement: Non-query statement %s to the query API.";
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60e7f12b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
new file mode 100644
index 0000000..6240f51
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.api.http.servlet.HyracksProperties;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.app.result.ResultUtil;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.IServlet;
+import org.apache.hyracks.http.server.IServletRequest;
+import org.apache.hyracks.http.server.IServletResponse;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class QueryResultApiServlet extends AbstractServlet {
+ private static final Logger LOGGER = Logger.getLogger(QueryResultApiServlet.class.getName());
+
+ public QueryResultApiServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+ super(ctx, paths);
+ }
+
+ @Override
+ public void handle(IServletRequest request, IServletResponse response) {
+ if (request.getHttpRequest().method() != HttpMethod.GET) {
+ response.setStatus(HttpResponseStatus.METHOD_NOT_ALLOWED);
+ return;
+ }
+ response.setStatus(HttpResponseStatus.OK);
+ // TODO this seems wrong ...
+ try {
+ IServletResponse.setContentType(response, IServlet.ContentType.TEXT_HTML, IServlet.Encoding.UTF8);
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Failure setting content type", e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
+ String strHandle = request.getParameter("handle");
+ PrintWriter out = response.writer();
+ IHyracksClientConnection hcc;
+ IHyracksDataset hds;
+
+ try {
+ if (strHandle == null || strHandle.isEmpty()) {
+ throw new AsterixException("Empty request, no handle provided");
+ }
+
+ HyracksProperties hp = new HyracksProperties();
+ String strIP = hp.getHyracksIPAddress();
+ int port = hp.getHyracksPort();
+
+ hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
+ if (hcc == null || hds == null) {
+ synchronized (ctx) {
+ hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
+ if (hcc == null) {
+ hcc = new HyracksConnection(strIP, port);
+ ctx.put(HYRACKS_CONNECTION_ATTR, hcc);
+ }
+ if (hds == null) {
+ hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+ ctx.put(HYRACKS_DATASET_ATTR, hds);
+ }
+ }
+ }
+ ObjectMapper om = new ObjectMapper();
+ ObjectNode handleObj = (ObjectNode) om.readTree(strHandle);
+ ArrayNode handle = (ArrayNode) handleObj.get("handle");
+ JobId jobId = new JobId(handle.get(0).asLong());
+ ResultSetId rsId = new ResultSetId(handle.get(1).asLong());
+ ResultReader resultReader = new ResultReader(hds);
+ resultReader.open(jobId, rsId);
+
+ // QQQ The output format is determined by the initial
+ // query and cannot be modified here, so calling back to
+ // initResponse() is really an error. We need to find a
+ // way to send the same OutputFormat value here as was
+ // originally determined there. Need to save this value on
+ // some object that we can obtain here.
+ SessionConfig sessionConfig = RestApiServlet.initResponse(request, response);
+ ResultUtil.printResults(resultReader, sessionConfig, new Stats(), null);
+
+ } catch (Exception e) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ out.println(e.getMessage());
+ LOGGER.log(Level.WARNING, "Error retrieving result", e);
+ }
+ if (out.checkError()) {
+ LOGGER.warning("Error flushing output writer");
+ }
+ }
+
+}