You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2019/08/14 16:32:10 UTC
[hive] branch master updated: HIVE-13457 : Create HS2 REST API
endpoints for monitoring information (Pawel Szostek via Szehon)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 71605e6 HIVE-13457 : Create HS2 REST API endpoints for monitoring information (Pawel Szostek via Szehon)
71605e6 is described below
commit 71605e65ef0fa6a98036bb956db80b285835e1f3
Author: Szehon Ho <sz...@criteo.com>
AuthorDate: Wed Aug 7 10:57:38 2019 -0700
HIVE-13457 : Create HS2 REST API endpoints for monitoring information (Pawel Szostek via Szehon)
---
service/pom.xml | 12 ++
.../apache/hive/service/server/HiveServer2.java | 7 +
.../service/servlet/QueriesRESTfulAPIServlet.java | 178 ++++++++++++++++++++
.../hive/service/server/TestHS2HttpServer.java | 181 ++++++++++++++++++---
4 files changed, 352 insertions(+), 26 deletions(-)
diff --git a/service/pom.xml b/service/pom.xml
index c73a621..a75021e 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -300,6 +300,18 @@
<version>${apache-directory-server.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 2f3767f..5d81668 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -106,6 +106,7 @@ import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
import org.apache.hive.service.servlet.HS2LeadershipStatus;
import org.apache.hive.service.servlet.HS2Peers;
+import org.apache.hive.service.servlet.QueriesRESTfulAPIServlet;
import org.apache.hive.service.servlet.QueryProfileServlet;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -171,6 +172,11 @@ public class HiveServer2 extends CompositeService {
}
@VisibleForTesting
+ public CLIService getCliService() {
+ return this.cliService;
+ }
+
+ @VisibleForTesting
public void setPamAuthenticator(PamAuthenticator pamAuthenticator) {
this.pamAuthenticator = pamAuthenticator;
}
@@ -403,6 +409,7 @@ public class HiveServer2 extends CompositeService {
webServer = builder.build();
webServer.addServlet("query_page", "/query_page", QueryProfileServlet.class);
+ webServer.addServlet("api", "/api/*", QueriesRESTfulAPIServlet.class);
}
}
} catch (IOException ie) {
diff --git a/service/src/java/org/apache/hive/service/servlet/QueriesRESTfulAPIServlet.java b/service/src/java/org/apache/hive/service/servlet/QueriesRESTfulAPIServlet.java
new file mode 100644
index 0000000..990ec6e
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/servlet/QueriesRESTfulAPIServlet.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.servlet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.QueryInfo;
+import org.apache.hive.service.cli.operation.OperationManager;
+import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.Version;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.module.SimpleModule;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+
+/**
+ * QueriesRESTfulAPIServlet.
+ *
+ */
+public class QueriesRESTfulAPIServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(QueriesRESTfulAPIServlet.class);
+
+ private static final String API_V1 = "v1";
+ private static final String REQ_QUERIES = "queries";
+ private static final String REQ_SESSIONS = "sessions";
+ private static final String REQ_ACTIVE = "active";
+ private static final String REQ_HISTORICAL = "historical";
+
+
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ /*
+ Available endpoints are:
+ - /v1/queries/active
+ - /v1/queries/historical
+ - /v1/sessions
+ */
+
+ String pathInfo = request.getPathInfo();
+ if (pathInfo == null || "/".equals(pathInfo)) {
+ sendError(response, HttpServletResponse.SC_BAD_REQUEST, "Path to the endpoint is missing");
+ return;
+ }
+
+
+ String[] splits = pathInfo.split("/");
+ if (splits.length < 3) { //expecting at least 2 parts in the path
+ sendError(response, HttpServletResponse.SC_BAD_REQUEST, "Expecting at least 2 parts in the path");
+ return;
+ }
+
+ ServletContext ctx = getServletContext();
+ SessionManager sessionManager =
+ (SessionManager) ctx.getAttribute("hive.sm");
+ OperationManager operationManager = sessionManager.getOperationManager();
+
+ String apiVersion = splits[1];
+ if (apiVersion.equals(API_V1)) {
+ String reqType = splits[2];
+ if (reqType.equals(REQ_QUERIES)) {
+ if (splits.length != 4) {
+ sendError(response, HttpServletResponse.SC_NOT_FOUND,
+ "Expecting 3 parts in the path: /v1/queries/active or /v1/queries/historical");
+ return;
+ }
+ String queriesType = splits[3];
+ if (queriesType.equals(REQ_ACTIVE)) {
+ Collection<QueryInfo> operations = operationManager.getLiveQueryInfos();
+ LOG.info("Returning active SQL operations via the RESTful API");
+ sendAsJson(response, operations);
+ } else if (queriesType.equals(REQ_HISTORICAL)) {
+ Collection<QueryInfo> operations = operationManager.getHistoricalQueryInfos();
+ LOG.info("Returning historical SQL operations via the RESTful API");
+ sendAsJson(response, operations);
+ } else {
+ sendError(response, HttpServletResponse.SC_BAD_REQUEST, "Unknown query type: " + queriesType);
+ return;
+ }
+ } else if (reqType.equals(REQ_SESSIONS)) {
+ Collection<HiveSession> hiveSessions = sessionManager.getSessions();
+ LOG.info("Returning active sessions via the RESTful API");
+ sendAsJson(response, hiveSessions);
+ } else { // unrecognized request
+ sendError(response, HttpServletResponse.SC_NOT_FOUND, "Unknown request type: " + reqType);
+ return;
+ }
+ } else { // unrecognized API version
+ sendError(response, HttpServletResponse.SC_BAD_REQUEST, "This server only handles API v1");
+ return;
+ }
+ }
+
+ private void sendError(HttpServletResponse response,
+ Integer errorCode,
+ String message) {
+ response.setStatus(errorCode);
+ response.setContentType("application/json");
+ response.setCharacterEncoding("UTF-8");
+ try {
+ response.getWriter().write("{\"message\" : " + message + "}");
+ } catch (IOException e) {
+ LOG.error("Caught an exception while writing an HTTP error status", e);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private void sendAsJson(
+ HttpServletResponse response,
+ Object obj) {
+ response.setContentType("application/json");
+ response.setStatus(HttpServletResponse.SC_OK);
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule("CustomSessionModule", new Version(1, 0, 0, null));
+ module.addSerializer(HiveSession.class, new HiveSessionSerializer());
+ mapper.registerModule(module);
+
+ try {
+ PrintWriter out = response.getWriter();
+ String objectAsJson = mapper.writeValueAsString(obj);
+ out.print(objectAsJson);
+ out.flush();
+ out.close();
+ } catch (IOException e) {
+ LOG.error("Caught an exception while writing an HTTP response", e);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private static class HiveSessionSerializer extends JsonSerializer<HiveSession> {
+ @Override
+ public void serialize(
+ HiveSession hiveSession,
+ JsonGenerator jgen,
+ SerializerProvider serializerProvider)
+ throws IOException, JsonProcessingException {
+ long currentTime = System.currentTimeMillis();
+
+ jgen.writeStartObject();
+ jgen.writeStringField("sessionId", hiveSession.getSessionHandle().getSessionId().toString());
+ jgen.writeStringField("username", hiveSession.getUserName());
+ jgen.writeStringField("ipAddress", hiveSession.getIpAddress());
+ jgen.writeNumberField("operationCount", hiveSession.getOpenOperationCount());
+ jgen.writeNumberField("activeTime", (currentTime - hiveSession.getCreationTime()) / 1000);
+ jgen.writeNumberField("idleTime", (currentTime - hiveSession.getLastAccessTime()) / 1000);
+ jgen.writeEndObject();
+ }
+ }
+}
diff --git a/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java b/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java
index bb6a231..3047443 100644
--- a/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java
+++ b/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java
@@ -18,52 +18,65 @@
package org.apache.hive.service.server;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.StringWriter;
-import java.net.HttpURLConnection;
-import java.net.URL;
-
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertNotNull;
-
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
/**
- * TestHS2HttpServer -- executes tests of HiveServer2 HTTP Server
+ * TestHS2HttpServer -- executes tests of HiveServer2 HTTP Server.
*/
public class TestHS2HttpServer {
private static HiveServer2 hiveServer2 = null;
+ private static CLIService client = null;
+ private static SessionManager sm = null;
private static HiveConf hiveConf = null;
private static String metastorePasswd = "61ecbc41cdae3e6b32712a06c73606fa"; //random md5
private static Integer webUIPort = null;
+ private static String apiBaseURL = null;
+
@BeforeClass
public static void beforeTests() throws Exception {
webUIPort = MetaStoreTestUtils.findFreePortExcepting(
Integer.valueOf(ConfVars.HIVE_SERVER2_WEBUI_PORT.getDefaultValue()));
+ apiBaseURL = "http://localhost:" + webUIPort + "/api/v1";
hiveConf = new HiveConf();
hiveConf.set(ConfVars.METASTOREPWD.varname, metastorePasswd);
hiveConf.set(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, webUIPort.toString());
- hiveConf
- .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
- "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
Exception hs2Exception = null;
boolean hs2Started = false;
@@ -72,6 +85,7 @@ public class TestHS2HttpServer {
hiveServer2 = new HiveServer2();
hiveServer2.init(hiveConf);
hiveServer2.start();
+ client = hiveServer2.getCliService();
Thread.sleep(5000);
hs2Started = true;
break;
@@ -85,10 +99,10 @@ public class TestHS2HttpServer {
webUIPort = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT);
}
}
-
if (!hs2Started) {
- throw(hs2Exception);
+ throw (hs2Exception);
}
+ sm = hiveServer2.getCliService().getSessionManager();
}
@Test
@@ -122,6 +136,121 @@ public class TestHS2HttpServer {
assertNotNull(xContentTypeHeader);
}
+ private BufferedReader getReaderForUrl(String urlString) throws Exception {
+ URL url = new URL(urlString);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ BufferedReader reader =
+ new BufferedReader(new InputStreamReader(conn.getInputStream()));
+ return reader;
+ }
+
+ private String readFromUrl(String urlString) throws Exception {
+ BufferedReader reader = getReaderForUrl(urlString);
+ StringBuilder response = new StringBuilder();
+ String inputLine;
+
+ while ((inputLine = reader.readLine()) != null) {
+ response.append(inputLine);
+ }
+ reader.close();
+ return response.toString();
+ }
+
+ private static List<JsonNode> getListOfNodes(String json) throws Exception {
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode rootNode = objectMapper.readTree(json);
+
+ ArrayList<JsonNode> nodes = new ArrayList<>();
+ if (rootNode.isArray()) {
+ for (final JsonNode node : rootNode) {
+ nodes.add(node);
+ }
+ }
+ return nodes;
+ }
+
+ @Test
+ public void testApiServletHistoricalQueries() throws Exception {
+ String historicalQueriesRoute = "/queries/historical";
+
+ final SessionHandle handle =
+ sm.openSession(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9, "user", "passw", "127.0.0.1",
+ new HashMap());
+
+ String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+ + " = false";
+ OperationHandle opHandle = client.executeStatement(handle, queryString, new HashMap());
+ client.closeOperation(opHandle);
+
+ opHandle = client.executeStatement(handle, "SELECT 1", new HashMap());
+ client.closeOperation(opHandle);
+
+ String queriesResponse = readFromUrl(apiBaseURL + historicalQueriesRoute);
+ List<JsonNode> historicalQueries = getListOfNodes(queriesResponse);
+ Assert.assertTrue(historicalQueries.size() == 1);
+
+ JsonNode historicalQuery = historicalQueries.get(0);
+ Assert.assertEquals(historicalQuery.path("running").asBoolean(), false);
+ Assert.assertEquals(historicalQuery.path("state").asText(), "FINISHED");
+ Assert.assertTrue(historicalQuery.path("runtime").canConvertToInt());
+ Assert.assertTrue(historicalQuery.path("queryDisplay").isObject());
+ }
+
+ @Test
+ public void testApiServletActiveSessions() throws Exception {
+ String sessionsRoute = "/sessions";
+
+ String initNoSessionsResponse = readFromUrl(apiBaseURL + sessionsRoute);
+ Assert.assertTrue("[]".equals(initNoSessionsResponse));
+
+ SessionHandle handle1 =
+ sm.openSession(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9, "user", "passw", "127.0.0.1",
+ new HashMap());
+
+ String oneSessionResponse = readFromUrl(apiBaseURL + sessionsRoute);
+
+ List<JsonNode> sessionNodes = getListOfNodes(oneSessionResponse);
+ Assert.assertEquals(sessionNodes.size(), 1);
+
+ JsonNode session = sessionNodes.get(0);
+ Assert.assertEquals(session.path("sessionId").asText(), handle1.getSessionId().toString());
+ Assert.assertEquals(session.path("username").asText(), "user");
+ Assert.assertEquals(session.path("ipAddress").asText(), "127.0.0.1");
+ Assert.assertEquals(session.path("operationCount").asInt(), 0);
+ Assert.assertTrue(session.path("activeTime").canConvertToInt());
+ Assert.assertTrue(session.path("idleTime").canConvertToInt());
+
+ SessionHandle handle2 = sm.openSession(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9, "user", "passw", "127.0.0.1",
+ new HashMap());
+
+ String twoSessionsResponse = readFromUrl(apiBaseURL + sessionsRoute);
+ List<JsonNode> twoSessionsNodes = getListOfNodes(twoSessionsResponse);
+ Assert.assertEquals(twoSessionsNodes.size(), 2);
+
+ sm.closeSession(handle1);
+ sm.closeSession(handle2);
+
+ String endNoSessionsResponse = readFromUrl(apiBaseURL + sessionsRoute);
+ Assert.assertTrue("[]".equals(endNoSessionsResponse));
+ }
+
+ @Test
+ public void testWrongApiVersion() throws Exception {
+ String wrongApiVersionUrl = "http://localhost:" + webUIPort + "/api/v2";
+ URL url = new URL(wrongApiVersionUrl);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ Assert.assertEquals(HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode());
+ }
+
+ @Test
+ public void testWrongRoute() throws Exception {
+ String wrongRouteUrl = "http://localhost:" + webUIPort + "/api/v1/nonexistingRoute";
+ URL url = new URL(wrongRouteUrl);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ Assert.assertEquals(HttpURLConnection.HTTP_NOT_FOUND, conn.getResponseCode());
+ }
+
@Test
public void testContextRootUrlRewrite() throws Exception {
String datePattern = "[a-zA-Z]{3} [a-zA-Z]{3} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}";
@@ -145,18 +274,18 @@ public class TestHS2HttpServer {
CloseableHttpClient httpclient = null;
try {
httpclient = HttpClients.createDefault();
- HttpGet httpGet = new HttpGet("http://localhost:"+webUIPort+"/conf");
+ HttpGet httpGet = new HttpGet("http://localhost:" + webUIPort + "/conf");
CloseableHttpResponse response1 = httpclient.execute(httpGet);
try {
HttpEntity entity1 = response1.getEntity();
BufferedReader br = new BufferedReader(new InputStreamReader(entity1.getContent()));
String line;
- while ((line = br.readLine())!= null) {
- if (line.contains(metastorePasswd)){
+ while ((line = br.readLine()) != null) {
+ if (line.contains(metastorePasswd)) {
pwdValFound = line;
}
- if (line.contains(ConfVars.METASTOREPWD.varname)){
+ if (line.contains(ConfVars.METASTOREPWD.varname)) {
pwdKeyFound = line;
}
}
@@ -165,7 +294,7 @@ public class TestHS2HttpServer {
response1.close();
}
} finally {
- if (httpclient != null){
+ if (httpclient != null) {
httpclient.close();
}
}