You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2019/08/14 16:32:10 UTC

[hive] branch master updated: HIVE-13457 : Create HS2 REST API endpoints for monitoring information (Pawel Szostek via Szehon)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 71605e6  HIVE-13457 : Create HS2 REST API endpoints for monitoring information (Pawel Szostek via Szehon)
71605e6 is described below

commit 71605e65ef0fa6a98036bb956db80b285835e1f3
Author: Szehon Ho <sz...@criteo.com>
AuthorDate: Wed Aug 7 10:57:38 2019 -0700

    HIVE-13457 : Create HS2 REST API endpoints for monitoring information (Pawel Szostek via Szehon)
---
 service/pom.xml                                    |  12 ++
 .../apache/hive/service/server/HiveServer2.java    |   7 +
 .../service/servlet/QueriesRESTfulAPIServlet.java  | 178 ++++++++++++++++++++
 .../hive/service/server/TestHS2HttpServer.java     | 181 ++++++++++++++++++---
 4 files changed, 352 insertions(+), 26 deletions(-)

diff --git a/service/pom.xml b/service/pom.xml
index c73a621..a75021e 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -300,6 +300,18 @@
       <version>${apache-directory-server.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>${powermock.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito</artifactId>
+      <version>${powermock.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 2f3767f..5d81668 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -106,6 +106,7 @@ import org.apache.hive.service.cli.thrift.ThriftCLIService;
 import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
 import org.apache.hive.service.servlet.HS2LeadershipStatus;
 import org.apache.hive.service.servlet.HS2Peers;
+import org.apache.hive.service.servlet.QueriesRESTfulAPIServlet;
 import org.apache.hive.service.servlet.QueryProfileServlet;
 import org.apache.http.StatusLine;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -171,6 +172,11 @@ public class HiveServer2 extends CompositeService {
   }
 
   @VisibleForTesting
+  public CLIService getCliService() {
+    return this.cliService;
+  }
+
+  @VisibleForTesting
   public void setPamAuthenticator(PamAuthenticator pamAuthenticator) {
     this.pamAuthenticator = pamAuthenticator;
   }
@@ -403,6 +409,7 @@ public class HiveServer2 extends CompositeService {
 
           webServer = builder.build();
           webServer.addServlet("query_page", "/query_page", QueryProfileServlet.class);
+          webServer.addServlet("api", "/api/*", QueriesRESTfulAPIServlet.class);
         }
       }
     } catch (IOException ie) {
diff --git a/service/src/java/org/apache/hive/service/servlet/QueriesRESTfulAPIServlet.java b/service/src/java/org/apache/hive/service/servlet/QueriesRESTfulAPIServlet.java
new file mode 100644
index 0000000..990ec6e
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/servlet/QueriesRESTfulAPIServlet.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.servlet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.QueryInfo;
+import org.apache.hive.service.cli.operation.OperationManager;
+import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.Version;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.module.SimpleModule;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+
+/**
+ * QueriesRESTfulAPIServlet.
+ *
+ */
+public class QueriesRESTfulAPIServlet extends HttpServlet {
+  private static final long serialVersionUID = 1L;
+  private static final Log LOG = LogFactory.getLog(QueriesRESTfulAPIServlet.class);
+
+  private static final String API_V1 = "v1";
+  private static final String REQ_QUERIES = "queries";
+  private static final String REQ_SESSIONS = "sessions";
+  private static final String REQ_ACTIVE = "active";
+  private static final String REQ_HISTORICAL = "historical";
+
+
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response)
+      throws ServletException, IOException {
+        /*
+            Available endpoints are:
+             - /v1/queries/active
+             - /v1/queries/historical
+             - /v1/sessions
+        */
+
+    String pathInfo = request.getPathInfo();
+    if (pathInfo == null || "/".equals(pathInfo)) {
+      sendError(response, HttpServletResponse.SC_BAD_REQUEST, "Path to the endpoint is missing");
+      return;
+    }
+
+
+    String[] splits = pathInfo.split("/");
+    if (splits.length < 3) { //expecting at least 2 parts in the path
+      sendError(response, HttpServletResponse.SC_BAD_REQUEST, "Expecting at least 2 parts in the path");
+      return;
+    }
+
+    ServletContext ctx = getServletContext();
+    SessionManager sessionManager =
+        (SessionManager) ctx.getAttribute("hive.sm");
+    OperationManager operationManager = sessionManager.getOperationManager();
+
+    String apiVersion = splits[1];
+    if (apiVersion.equals(API_V1)) {
+      String reqType = splits[2];
+      if (reqType.equals(REQ_QUERIES)) {
+        if (splits.length != 4) {
+          sendError(response, HttpServletResponse.SC_NOT_FOUND,
+              "Expecting 3 parts in the path: /v1/queries/active or /v1/queries/historical");
+          return;
+        }
+        String queriesType = splits[3];
+        if (queriesType.equals(REQ_ACTIVE)) {
+          Collection<QueryInfo> operations = operationManager.getLiveQueryInfos();
+          LOG.info("Returning active SQL operations via the RESTful API");
+          sendAsJson(response, operations);
+        } else if (queriesType.equals(REQ_HISTORICAL)) {
+          Collection<QueryInfo> operations = operationManager.getHistoricalQueryInfos();
+          LOG.info("Returning historical SQL operations via the RESTful API");
+          sendAsJson(response, operations);
+        } else {
+          sendError(response, HttpServletResponse.SC_BAD_REQUEST, "Unknown query type: " + queriesType);
+          return;
+        }
+      } else if (reqType.equals(REQ_SESSIONS)) {
+        Collection<HiveSession> hiveSessions = sessionManager.getSessions();
+        LOG.info("Returning active sessions via the RESTful API");
+        sendAsJson(response, hiveSessions);
+      } else { // unrecognized request
+        sendError(response, HttpServletResponse.SC_NOT_FOUND, "Unknown request type: " + reqType);
+        return;
+      }
+    } else { // unrecognized API version
+      sendError(response, HttpServletResponse.SC_BAD_REQUEST, "This server only handles API v1");
+      return;
+    }
+  }
+
+  private void sendError(HttpServletResponse response,
+                         Integer errorCode,
+                         String message) {
+    response.setStatus(errorCode);
+    response.setContentType("application/json");
+    response.setCharacterEncoding("UTF-8");
+    try {
+      response.getWriter().write("{\"message\" : " + message + "}");
+    } catch (IOException e) {
+      LOG.error("Caught an exception while writing an HTTP error status", e);
+      response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private void sendAsJson(
+      HttpServletResponse response,
+      Object obj) {
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    ObjectMapper mapper = new ObjectMapper();
+    SimpleModule module = new SimpleModule("CustomSessionModule", new Version(1, 0, 0, null));
+    module.addSerializer(HiveSession.class, new HiveSessionSerializer());
+    mapper.registerModule(module);
+
+    try {
+      PrintWriter out = response.getWriter();
+      String objectAsJson = mapper.writeValueAsString(obj);
+      out.print(objectAsJson);
+      out.flush();
+      out.close();
+    } catch (IOException e) {
+      LOG.error("Caught an exception while writing an HTTP response", e);
+      response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private static class HiveSessionSerializer extends JsonSerializer<HiveSession> {
+    @Override
+    public void serialize(
+        HiveSession hiveSession,
+        JsonGenerator jgen,
+        SerializerProvider serializerProvider)
+        throws IOException, JsonProcessingException {
+      long currentTime = System.currentTimeMillis();
+
+      jgen.writeStartObject();
+      jgen.writeStringField("sessionId", hiveSession.getSessionHandle().getSessionId().toString());
+      jgen.writeStringField("username", hiveSession.getUserName());
+      jgen.writeStringField("ipAddress", hiveSession.getIpAddress());
+      jgen.writeNumberField("operationCount", hiveSession.getOpenOperationCount());
+      jgen.writeNumberField("activeTime", (currentTime - hiveSession.getCreationTime()) / 1000);
+      jgen.writeNumberField("idleTime", (currentTime - hiveSession.getLastAccessTime()) / 1000);
+      jgen.writeEndObject();
+    }
+  }
+}
diff --git a/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java b/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java
index bb6a231..3047443 100644
--- a/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java
+++ b/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java
@@ -18,52 +18,65 @@
 
 package org.apache.hive.service.server;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.StringWriter;
-import java.net.HttpURLConnection;
-import java.net.URL;
-
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertNotNull;
-
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
 import org.apache.http.HttpEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 /**
- * TestHS2HttpServer -- executes tests of HiveServer2 HTTP Server
+ * TestHS2HttpServer -- executes tests of HiveServer2 HTTP Server.
  */
 public class TestHS2HttpServer {
 
   private static HiveServer2 hiveServer2 = null;
+  private static CLIService client = null;
+  private static SessionManager sm = null;
   private static HiveConf hiveConf = null;
   private static String metastorePasswd = "61ecbc41cdae3e6b32712a06c73606fa"; //random md5
   private static Integer webUIPort = null;
+  private static String apiBaseURL = null;
+
 
   @BeforeClass
   public static void beforeTests() throws Exception {
     webUIPort = MetaStoreTestUtils.findFreePortExcepting(
         Integer.valueOf(ConfVars.HIVE_SERVER2_WEBUI_PORT.getDefaultValue()));
+    apiBaseURL = "http://localhost:" + webUIPort + "/api/v1";
     hiveConf = new HiveConf();
     hiveConf.set(ConfVars.METASTOREPWD.varname, metastorePasswd);
     hiveConf.set(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, webUIPort.toString());
-    hiveConf
-        .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-            "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+        "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
 
     Exception hs2Exception = null;
     boolean hs2Started = false;
@@ -72,6 +85,7 @@ public class TestHS2HttpServer {
         hiveServer2 = new HiveServer2();
         hiveServer2.init(hiveConf);
         hiveServer2.start();
+        client = hiveServer2.getCliService();
         Thread.sleep(5000);
         hs2Started = true;
         break;
@@ -85,10 +99,10 @@ public class TestHS2HttpServer {
         webUIPort = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT);
       }
     }
-
     if (!hs2Started) {
-      throw(hs2Exception);
+      throw (hs2Exception);
     }
+    sm = hiveServer2.getCliService().getSessionManager();
   }
 
   @Test
@@ -122,6 +136,121 @@ public class TestHS2HttpServer {
     assertNotNull(xContentTypeHeader);
   }
 
+  private BufferedReader getReaderForUrl(String urlString) throws Exception {
+    URL url = new URL(urlString);
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+    BufferedReader reader =
+        new BufferedReader(new InputStreamReader(conn.getInputStream()));
+    return reader;
+  }
+
+  private String readFromUrl(String urlString) throws Exception {
+    BufferedReader reader = getReaderForUrl(urlString);
+    StringBuilder response = new StringBuilder();
+    String inputLine;
+
+    while ((inputLine = reader.readLine()) != null) {
+      response.append(inputLine);
+    }
+    reader.close();
+    return response.toString();
+  }
+
+  private static List<JsonNode> getListOfNodes(String json) throws Exception {
+    ObjectMapper objectMapper = new ObjectMapper();
+    JsonNode rootNode = objectMapper.readTree(json);
+
+    ArrayList<JsonNode> nodes = new ArrayList<>();
+    if (rootNode.isArray()) {
+      for (final JsonNode node : rootNode) {
+        nodes.add(node);
+      }
+    }
+    return nodes;
+  }
+
+  @Test
+  public void testApiServletHistoricalQueries() throws Exception {
+    String historicalQueriesRoute = "/queries/historical";
+
+    final SessionHandle handle =
+        sm.openSession(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9, "user", "passw", "127.0.0.1",
+            new HashMap());
+
+    String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+        + " = false";
+    OperationHandle opHandle = client.executeStatement(handle, queryString, new HashMap());
+    client.closeOperation(opHandle);
+
+    opHandle = client.executeStatement(handle, "SELECT 1", new HashMap());
+    client.closeOperation(opHandle);
+
+    String queriesResponse = readFromUrl(apiBaseURL + historicalQueriesRoute);
+    List<JsonNode> historicalQueries = getListOfNodes(queriesResponse);
+    Assert.assertTrue(historicalQueries.size() == 1);
+
+    JsonNode historicalQuery = historicalQueries.get(0);
+    Assert.assertEquals(historicalQuery.path("running").asBoolean(), false);
+    Assert.assertEquals(historicalQuery.path("state").asText(), "FINISHED");
+    Assert.assertTrue(historicalQuery.path("runtime").canConvertToInt());
+    Assert.assertTrue(historicalQuery.path("queryDisplay").isObject());
+  }
+
+  @Test
+  public void testApiServletActiveSessions() throws Exception {
+    String sessionsRoute = "/sessions";
+
+    String initNoSessionsResponse = readFromUrl(apiBaseURL + sessionsRoute);
+    Assert.assertTrue("[]".equals(initNoSessionsResponse));
+
+    SessionHandle handle1 =
+        sm.openSession(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9, "user", "passw", "127.0.0.1",
+            new HashMap());
+
+    String oneSessionResponse = readFromUrl(apiBaseURL + sessionsRoute);
+
+    List<JsonNode> sessionNodes = getListOfNodes(oneSessionResponse);
+    Assert.assertEquals(sessionNodes.size(), 1);
+
+    JsonNode session = sessionNodes.get(0);
+    Assert.assertEquals(session.path("sessionId").asText(), handle1.getSessionId().toString());
+    Assert.assertEquals(session.path("username").asText(), "user");
+    Assert.assertEquals(session.path("ipAddress").asText(), "127.0.0.1");
+    Assert.assertEquals(session.path("operationCount").asInt(), 0);
+    Assert.assertTrue(session.path("activeTime").canConvertToInt());
+    Assert.assertTrue(session.path("idleTime").canConvertToInt());
+
+    SessionHandle handle2 = sm.openSession(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9, "user", "passw", "127.0.0.1",
+        new HashMap());
+
+    String twoSessionsResponse = readFromUrl(apiBaseURL + sessionsRoute);
+    List<JsonNode> twoSessionsNodes = getListOfNodes(twoSessionsResponse);
+    Assert.assertEquals(twoSessionsNodes.size(), 2);
+
+    sm.closeSession(handle1);
+    sm.closeSession(handle2);
+
+    String endNoSessionsResponse = readFromUrl(apiBaseURL + sessionsRoute);
+    Assert.assertTrue("[]".equals(endNoSessionsResponse));
+  }
+
+  @Test
+  public void testWrongApiVersion() throws Exception {
+    String wrongApiVersionUrl = "http://localhost:" + webUIPort + "/api/v2";
+    URL url = new URL(wrongApiVersionUrl);
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    Assert.assertEquals(HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode());
+  }
+
+  @Test
+  public void testWrongRoute() throws Exception {
+    String wrongRouteUrl = "http://localhost:" + webUIPort + "/api/v1/nonexistingRoute";
+    URL url = new URL(wrongRouteUrl);
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    Assert.assertEquals(HttpURLConnection.HTTP_NOT_FOUND, conn.getResponseCode());
+  }
+
   @Test
   public void testContextRootUrlRewrite() throws Exception {
     String datePattern = "[a-zA-Z]{3} [a-zA-Z]{3} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}";
@@ -145,18 +274,18 @@ public class TestHS2HttpServer {
     CloseableHttpClient httpclient = null;
     try {
       httpclient = HttpClients.createDefault();
-      HttpGet httpGet = new HttpGet("http://localhost:"+webUIPort+"/conf");
+      HttpGet httpGet = new HttpGet("http://localhost:" + webUIPort + "/conf");
       CloseableHttpResponse response1 = httpclient.execute(httpGet);
 
       try {
         HttpEntity entity1 = response1.getEntity();
         BufferedReader br = new BufferedReader(new InputStreamReader(entity1.getContent()));
         String line;
-        while ((line = br.readLine())!= null) {
-          if (line.contains(metastorePasswd)){
+        while ((line = br.readLine()) != null) {
+          if (line.contains(metastorePasswd)) {
             pwdValFound = line;
           }
-          if (line.contains(ConfVars.METASTOREPWD.varname)){
+          if (line.contains(ConfVars.METASTOREPWD.varname)) {
             pwdKeyFound = line;
           }
         }
@@ -165,7 +294,7 @@ public class TestHS2HttpServer {
         response1.close();
       }
     } finally {
-      if (httpclient != null){
+      if (httpclient != null) {
         httpclient.close();
       }
     }