You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by GitBox <gi...@apache.org> on 2020/09/02 08:36:04 UTC

[GitHub] [zeppelin] Reamer commented on a change in pull request #3887: [ZEPPELIN-4981]. Zeppelin Client API

Reamer commented on a change in pull request #3887:
URL: https://github.com/apache/zeppelin/pull/3887#discussion_r481851279



##########
File path: zeppelin-client/src/main/java/org/apache/zeppelin/client/ZeppelinClient.java
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client;
+
+import kong.unirest.GetRequest;
+import kong.unirest.HttpResponse;
+import kong.unirest.JsonNode;
+import kong.unirest.Unirest;
+import kong.unirest.apache.ApacheClient;
+import kong.unirest.json.JSONArray;
+import kong.unirest.json.JSONObject;
+import org.apache.commons.text.StringEscapeUtils;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import unirest.shaded.org.apache.http.client.HttpClient;
+import unirest.shaded.org.apache.http.impl.client.HttpClients;
+
+import javax.net.ssl.SSLContext;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Low level api for interacting with Zeppelin. Underneath, it use the zeppelin rest api.
+ * You can use this class to operate Zeppelin note/paragraph,
+ * e.g. get/add/delete/update/execute/cancel
+ */
+public class ZeppelinClient {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinClient.class);
+
+  private ClientConfig clientConfig;
+
+  public ZeppelinClient(ClientConfig clientConfig) throws Exception {
+    this.clientConfig = clientConfig;
+    Unirest.config().defaultBaseUrl(clientConfig.getZeppelinRestUrl() + "/api");
+
+    if (clientConfig.isUseKnox()) {
+      try {
+        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustSelfSignedStrategy() {
+          public boolean isTrusted(X509Certificate[] chain, String authType) {
+            return true;
+          }
+        }).build();
+        HttpClient customHttpClient = HttpClients.custom().setSSLContext(sslContext)
+                .setSSLHostnameVerifier(new NoopHostnameVerifier()).build();
+        Unirest.config().httpClient(ApacheClient.builder(customHttpClient));
+      } catch (Exception e) {
+        throw new Exception("Fail to setup httpclient of Unirest", e);
+      }
+    }
+  }
+
+  public ClientConfig getClientConfig() {
+    return clientConfig;
+  }
+
+  /**
+   * Throw exception if the status code is not 200.
+   *
+   * @param response
+   * @throws Exception
+   */
+  private void checkResponse(HttpResponse<JsonNode> response) throws Exception {
+    if (response.getStatus() == 302) {

Review comment:
       302 is the answer to a redirection. 401 is the status code for Unauthorized. Are we responding with an incorrect status code from the Zeppelin server?

##########
File path: zeppelin-client/src/main/java/org/apache/zeppelin/client/ZeppelinClient.java
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client;
+
+import kong.unirest.GetRequest;
+import kong.unirest.HttpResponse;
+import kong.unirest.JsonNode;
+import kong.unirest.Unirest;
+import kong.unirest.apache.ApacheClient;
+import kong.unirest.json.JSONArray;
+import kong.unirest.json.JSONObject;
+import org.apache.commons.text.StringEscapeUtils;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import unirest.shaded.org.apache.http.client.HttpClient;
+import unirest.shaded.org.apache.http.impl.client.HttpClients;
+
+import javax.net.ssl.SSLContext;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Low level api for interacting with Zeppelin. Underneath, it use the zeppelin rest api.
+ * You can use this class to operate Zeppelin note/paragraph,
+ * e.g. get/add/delete/update/execute/cancel
+ */
+public class ZeppelinClient {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinClient.class);
+
+  private ClientConfig clientConfig;
+
+  public ZeppelinClient(ClientConfig clientConfig) throws Exception {
+    this.clientConfig = clientConfig;
+    Unirest.config().defaultBaseUrl(clientConfig.getZeppelinRestUrl() + "/api");
+
+    if (clientConfig.isUseKnox()) {
+      try {
+        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustSelfSignedStrategy() {
+          public boolean isTrusted(X509Certificate[] chain, String authType) {
+            return true;
+          }
+        }).build();
+        HttpClient customHttpClient = HttpClients.custom().setSSLContext(sslContext)
+                .setSSLHostnameVerifier(new NoopHostnameVerifier()).build();
+        Unirest.config().httpClient(ApacheClient.builder(customHttpClient));
+      } catch (Exception e) {
+        throw new Exception("Fail to setup httpclient of Unirest", e);
+      }
+    }
+  }
+
+  public ClientConfig getClientConfig() {
+    return clientConfig;
+  }
+
+  /**
+   * Throw exception if the status code is not 200.
+   *
+   * @param response
+   * @throws Exception
+   */
+  private void checkResponse(HttpResponse<JsonNode> response) throws Exception {
+    if (response.getStatus() == 302) {
+      throw new Exception("Please login first");
+    }
+    if (response.getStatus() != 200) {
+      throw new Exception(String.format("Unable to call rest api, status: %s, statusText: %s, message: %s",
+              response.getStatus(),
+              response.getStatusText(),
+              response.getBody().getObject().getString("message")));
+    }
+  }
+
+  /**
+   * Throw exception if the status in the json object is not `OK`.
+   *
+   * @param jsonNode
+   * @throws Exception
+   */
+  private void checkJsonNodeStatus(JsonNode jsonNode) throws Exception {
+    if (! "OK".equalsIgnoreCase(jsonNode.getObject().getString("status"))) {
+      throw new Exception(StringEscapeUtils.unescapeJava(jsonNode.getObject().getString("message")));
+    }
+  }
+
+  /**
+   * Get Zeppelin version.
+   *
+   * @return
+   * @throws Exception
+   */
+  public String getVersion() throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .get("/version")
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    return jsonNode.getObject().getJSONObject("body").getString("version");
+  }
+
+  /**
+   * Request a new session id. It doesn't create session (interpreter process) in zeppelin server side, but just
+   * create an unique session id.
+   *
+   * @param interpreter
+   * @return
+   * @throws Exception
+   */
+  public SessionInfo newSession(String interpreter) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .post("/session")
+            .queryString("interpreter", interpreter)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    return new SessionInfo(jsonNode.getObject().getJSONObject("body"));
+  }
+
+  /**
+   * Stop the session(interpreter process) in Zeppelin server.
+   *
+   * @param sessionId
+   * @throws Exception
+   */
+  public void stopSession(String sessionId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .delete("/session/{sessionId}")
+            .routeParam("sessionId", sessionId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+  }
+
+  /**
+   * Get session info for the provided sessionId.
+   *
+   * @param sessionId
+   * @throws Exception
+   */
+  public SessionInfo getSession(String sessionId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .get("/session/{sessionId}")
+            .routeParam("sessionId", sessionId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    JSONObject bodyObject = jsonNode.getObject().getJSONObject("body");
+    return new SessionInfo(bodyObject);
+  }
+
+  /**
+   * List all the sessions.
+   *
+   * @return
+   * @throws Exception
+   */
+  public List<SessionInfo> listSessions() throws Exception {
+    return listSessions(null);
+  }
+
+  /**
+   * List all the sessions for the provided interpreter.
+   *
+   * @param interpreter
+   * @return
+   * @throws Exception
+   */
+  public List<SessionInfo> listSessions(String interpreter) throws Exception {
+    GetRequest getRequest = Unirest.get("/session");
+    if (interpreter != null) {
+      getRequest.queryString("interpreter", interpreter);
+    }
+    HttpResponse<JsonNode> response = getRequest.asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    JSONArray sessionJsonArray = jsonNode.getObject().getJSONArray("body");
+    List<SessionInfo> sessionInfos = new ArrayList<>();
+    for (int i = 0; i< sessionJsonArray.length();++i) {
+      sessionInfos.add(new SessionInfo(sessionJsonArray.getJSONObject(i)));
+    }
+    return sessionInfos;
+  }
+
+  /**
+   * Login zeppelin with userName and password, throw exception if login fails.
+   *
+   * @param userName
+   * @param password
+   * @throws Exception
+   */
+  public void login(String userName, String password) throws Exception {
+    if (clientConfig.isUseKnox()) {
+      HttpResponse<String> response = Unirest.get("/")
+              .basicAuth(userName, password)
+              .asString();
+      if (response.getStatus() != 200) {
+        throw new Exception(String.format("Login failed, status: %s, statusText: %s",
+                response.getStatus(),
+                response.getStatusText()));
+      }
+    } else {
+      HttpResponse<JsonNode> response = Unirest
+              .post("/login")
+              .field("userName", userName)
+              .field("password", password)
+              .asJson();
+      if (response.getStatus() != 200) {
+        throw new Exception(String.format("Login failed, status: %s, statusText: %s",
+                response.getStatus(),
+                response.getStatusText()));
+      }
+    }
+  }
+
+  public String createNote(String notePath) throws Exception {
+    return createNote(notePath, "");
+  }
+
+  /**
+   * Create a new empty note with provided notePath and defaultInterpreterGroup
+   *
+   * @param notePath
+   * @param defaultInterpreterGroup
+   * @return
+   * @throws Exception
+   */
+  public String createNote(String notePath, String defaultInterpreterGroup) throws Exception {
+    JSONObject bodyObject = new JSONObject();
+    bodyObject.put("name", notePath);
+    bodyObject.put("defaultInterpreterGroup", defaultInterpreterGroup);
+    HttpResponse<JsonNode> response = Unirest
+            .post("/notebook")
+            .body(bodyObject.toString())
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    return jsonNode.getObject().getString("body");
+  }
+
+  /**
+   * Delete note with provided noteId.
+   *
+   * @param noteId
+   * @throws Exception
+   */
+  public void deleteNote(String noteId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .delete("/notebook/{noteId}")
+            .routeParam("noteId", noteId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+  }
+
+  /**
+   * Query {@link NoteResult} with provided noteId.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult queryNoteResult(String noteId) throws Exception {
+    HttpResponse<JsonNode> response = Unirest
+            .get("/notebook/{noteId}")
+            .routeParam("noteId", noteId)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+
+    JSONObject noteJsonObject = jsonNode.getObject().getJSONObject("body");
+    boolean isRunning = false;
+    if (noteJsonObject.has("info")) {
+      JSONObject infoJsonObject = noteJsonObject.getJSONObject("info");
+      if (infoJsonObject.has("isRunning")) {
+        isRunning = Boolean.parseBoolean(infoJsonObject.getString("isRunning"));
+      }
+    }
+
+    List<ParagraphResult> paragraphResultList = new ArrayList<>();
+    if (noteJsonObject.has("paragraphs")) {
+      JSONArray paragraphJsonArray = noteJsonObject.getJSONArray("paragraphs");
+      for (int i = 0; i< paragraphJsonArray.length(); ++i) {
+        paragraphResultList.add(new ParagraphResult(paragraphJsonArray.getJSONObject(i)));
+      }
+    }
+
+    return new NoteResult(noteId, isRunning, paragraphResultList);
+  }
+
+  /**
+   * Execute note with provided noteId, return until note execution is completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult executeNote(String noteId) throws Exception {
+    return executeNote(noteId, new HashMap<>());
+  }
+
+  /**
+   * Execute note with provided noteId and parameters, return until note execution is completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @param parameters
+   * @return
+   * @throws Exception
+   */
+  public NoteResult executeNote(String noteId, Map<String, String> parameters) throws Exception {
+    submitNote(noteId, parameters);
+    return waitUntilNoteFinished(noteId);
+  }
+
+  /**
+   * Submit note to execute with provided noteId, return at once the submission is completed.
+   * You need to query {@link NoteResult} by yourself afterwards until note execution is completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult submitNote(String noteId) throws Exception  {
+    return submitNote(noteId, new HashMap<>());
+  }
+
+  /**
+   * Submit note to execute with provided noteId and parameters, return at once the submission is completed.
+   * You need to query {@link NoteResult} by yourself afterwards until note execution is completed.
+   * Interpreter process will be stopped after note execution.
+   *
+   * @param noteId
+   * @param parameters
+   * @return
+   * @throws Exception
+   */
+  public NoteResult submitNote(String noteId, Map<String, String> parameters) throws Exception  {
+    JSONObject bodyObject = new JSONObject();
+    bodyObject.put("params", parameters);
+    // run note in non-blocking and isolated way.
+    HttpResponse<JsonNode> response = Unirest
+            .post("/notebook/job/{noteId}")
+            .routeParam("noteId", noteId)
+            .queryString("blocking", "false")
+            .queryString("isolated", "true")
+            .body(bodyObject)
+            .asJson();
+    checkResponse(response);
+    JsonNode jsonNode = response.getBody();
+    checkJsonNodeStatus(jsonNode);
+    return queryNoteResult(noteId);
+  }
+
+  /**
+   * Block there until note execution is completed.
+   *
+   * @param noteId
+   * @return
+   * @throws Exception
+   */
+  public NoteResult waitUntilNoteFinished(String noteId) throws Exception {

Review comment:
       This function can lead to to an infinity loop. I would call  `waitUntilNoteFinished(String noteId, long timeoutInMills)` with a high default timeout.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org