You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/09/09 07:05:36 UTC
[zeppelin] branch master updated: [ZEPPELIN-4981]. Zeppelin Client
API
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new f11e6f3 [ZEPPELIN-4981]. Zeppelin Client API
f11e6f3 is described below
commit f11e6f35bd4da1a73ef5d96b548216ee161c439a
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Sep 8 15:18:28 2020 +0800
[ZEPPELIN-4981]. Zeppelin Client API
### What is this PR for?
This is the initial PR fo introducing zeppelin client api. You can check this google doc for the overall design. https://docs.google.com/document/d/1bLLKKxleZlZpP9EFJlLLkJKwDBps-RNvzNwh3LFZWZ4/edit?usp=sharing
In this PR, I add 2 modules: zeppelin-client, zeppelin-client-examples.
You can take a look at the module zeppelin-client-examples to see how we can use zeppelin client api.
### What type of PR is it?
[ Feature ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4981
### How should this be tested?
* Manually tested and Unit test is added.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3887 from zjffdu/ZEPPELIN-4981 and squashes the following commits:
9b8ff4244 [Jeff Zhang] update travis
8eddf3e09 [Jeff Zhang] use ExceptionMapper
bb0537129 [Jeff Zhang] update
4205c1c5e [Jeff Zhang] code refactoring
53fe1c13d [Jeff Zhang] update examples
487cb8c05 [Jeff Zhang] minor update
d7d11f77c [Jeff Zhang] minor update
f5e1e2e76 [Jeff Zhang] add session reconnect
bc723e766 [Jeff Zhang] minor update
7fbd48147 [Jeff Zhang] minor update
fe22986ff [Jeff Zhang] add sessionId to ZSession
c7701765a [Jeff Zhang] add session status api
1dbc414ba [Jeff Zhang] move classes to websocket folder
0e43ce1f9 [Jeff Zhang] add more examples
39d230b29 [Jeff Zhang] fix ut
d4bbe9522 [Jeff Zhang] add zeppelin client examples
5ca49953a [Jeff Zhang] update
5611b7b0e [Jeff Zhang] resolve gson dependency issue
1aa1d68da [Jeff Zhang] [ZEPPELIN-4981]. Zeppelin Client API (Zeppelin SDK)
---
.travis.yml | 21 +
pom.xml | 2 +
spark/pom.xml | 2 +
zeppelin-client-examples/pom.xml | 83 +++
.../client/examples/FlinkAdvancedExample.java | 111 +++
.../client/examples/FlinkAdvancedExample2.java | 110 +++
.../zeppelin/client/examples/FlinkExample.java | 103 +++
.../zeppelin/client/examples/HiveExample.java | 78 +++
.../zeppelin/client/examples/PrestoExample.java | 79 +++
.../zeppelin/client/examples/PythonExample.java | 96 +++
.../apache/zeppelin/client/examples/RExample.java | 93 +++
.../client/examples/SparkAdvancedExample.java | 102 +++
.../zeppelin/client/examples/SparkExample.java | 112 +++
.../client/examples/ZeppelinClientExample.java | 71 ++
.../client/examples/ZeppelinClientExample2.java | 71 ++
.../src/main/resources/init_stream.scala | 63 ++
.../src/main/resources/log4j.properties | 22 +
zeppelin-client/pom.xml | 113 ++++
.../org/apache/zeppelin/client/ClientConfig.java | 62 ++
.../org/apache/zeppelin/client/ExecuteResult.java | 79 +++
.../org/apache/zeppelin/client/NoteResult.java | 40 +-
.../apache/zeppelin/client/ParagraphResult.java | 122 ++++
.../java/org/apache/zeppelin/client/Result.java | 42 +-
.../org/apache/zeppelin/client/SessionInfo.java | 95 +++
.../java/org/apache/zeppelin/client/Status.java | 42 +-
.../java/org/apache/zeppelin/client/ZSession.java | 504 ++++++++++++++
.../org/apache/zeppelin/client/ZeppelinClient.java | 747 +++++++++++++++++++++
.../client/websocket/AbstractMessageHandler.java | 87 +++
.../client/websocket/CompositeMessageHandler.java | 59 ++
.../client/websocket/JsonSerializable.java | 27 +-
.../apache/zeppelin/client/websocket/Message.java | 288 ++++++++
.../zeppelin/client/websocket/MessageHandler.java | 25 +-
.../client/websocket/SimpleMessageHandler.java | 30 +-
.../client/websocket/StatementMessageHandler.java | 45 +-
.../client/websocket/ZeppelinWebSocketClient.java | 122 ++++
.../src/test/resources/log4j.properties | 3 -
zeppelin-interpreter-integration/pom.xml | 12 +
.../integration/ZSessionIntegrationTest.java | 507 ++++++++++++++
.../integration/ZeppelinClientIntegrationTest.java | 378 +++++++++++
.../ZeppelinClientWithAuthIntegrationTest.java | 107 +++
.../src/test/resources/log4j.properties | 4 +-
.../zeppelin/interpreter/ExecutionContext.java | 13 +-
.../interpreter/ExecutionContextBuilder.java | 9 +-
.../apache/zeppelin/rest/InterpreterRestApi.java | 30 +-
.../org/apache/zeppelin/rest/NotebookRestApi.java | 189 +++---
.../org/apache/zeppelin/rest/SessionManager.java | 175 +++++
.../org/apache/zeppelin/rest/SessionRestApi.java | 106 +++
.../exception/WebApplicationExceptionMapper.java | 12 +-
.../zeppelin/rest/message/NewNoteRequest.java | 16 +-
.../rest/message/RestartInterpreterRequest.java | 8 +-
.../apache/zeppelin/rest/message/SessionInfo.java | 86 +++
.../rest/message/gson/ExceptionSerializer.java | 3 +
.../apache/zeppelin/service/NotebookService.java | 55 +-
.../org/apache/zeppelin/socket/NotebookServer.java | 4 +-
.../zeppelin/socket/NotebookWebSocketCreator.java | 1 +
.../apache/zeppelin/rest/AbstractTestRestApi.java | 1 +
.../apache/zeppelin/rest/NotebookRestApiTest.java | 21 +-
.../zeppelin/rest/NotebookSecurityRestApiTest.java | 8 +-
.../apache/zeppelin/rest/ZeppelinRestApiTest.java | 2 +-
.../zeppelin/service/NotebookServiceTest.java | 20 +-
.../zeppelin/interpreter/InterpreterSetting.java | 8 +
.../interpreter/RemoteInterpreterEventServer.java | 3 +-
.../remote/RemoteInterpreterProcess.java | 10 +
.../java/org/apache/zeppelin/notebook/Note.java | 14 +-
.../org/apache/zeppelin/notebook/Paragraph.java | 27 +-
65 files changed, 5273 insertions(+), 307 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index faf1b0c..7c50f9b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -215,6 +215,27 @@ jobs:
- mvn test -DskipRat -pl $(echo .,zeppelin-interpreter,zeppelin-interpreter-shaded,${INTERPRETERS} | sed 's/!//g') -Pscala-2.10 -B
+ - name: "Test zeppelin-client integration test"
+ jdk: "openjdk8"
+ dist: xenial
+ before_install:
+ - export PYTHON=3
+ - export R=true
+ - echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxMetaspaceSize=1024m -XX:-UseGCOverheadLimit -Dorg.slf4j.simpleLogger.defaultLogLevel=warn'" >> ~/.mavenrc
+ - bash -x ./testing/install_external_dependencies.sh
+ - source ~/.environ
+ install:
+ - mvn install -DskipTests -DskipRat -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown,flink/interpreter,jdbc,shell -am
+ - mvn clean package -T 2C -pl zeppelin-plugins -amd -B
+ before_script:
+ - echo "export ZEPPELIN_HELIUM_REGISTRY=helium" >> conf/zeppelin-env.sh
+ - echo "export SPARK_PRINT_LAUNCH_COMMAND=true" >> conf/zeppelin-env.sh
+ - export SPARK_PRINT_LAUNCH_COMMAND=true
+ - tail conf/zeppelin-env.sh
+ script:
+ - mvn test -DskipRat -pl zeppelin-interpreter-integration -Pintegration -Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest
+
+
- name: "Test flink 1.10 & flink integration test"
jdk: "openjdk8"
dist: xenial
diff --git a/pom.xml b/pom.xml
index 47e6e02..78f66de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,8 @@
<module>geode</module>
<module>ksql</module>
<module>sparql</module>
+ <module>zeppelin-client</module>
+ <module>zeppelin-client-examples</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-jupyter</module>
diff --git a/spark/pom.xml b/spark/pom.xml
index 25301b2..022c3cd 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -42,6 +42,8 @@
<!-- spark versions -->
<spark.version>2.4.5</spark.version>
+ <protobuf.version>2.5.0</protobuf.version>
+ <py4j.version>0.10.7</py4j.version>
<spark.scala.version>2.11.12</spark.scala.version>
<spark.scala.binary.version>2.11</spark.scala.binary.version>
diff --git a/zeppelin-client-examples/pom.xml b/zeppelin-client-examples/pom.xml
new file mode 100644
index 0000000..8817635
--- /dev/null
+++ b/zeppelin-client-examples/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zeppelin</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-client-examples</artifactId>
+ <packaging>jar</packaging>
+ <version>0.9.0-SNAPSHOT</version>
+ <name>Zeppelin: Client Examples</name>
+ <description>Zeppelin Client Examples</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample.java
new file mode 100644
index 0000000..64b6184
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Advanced example of run flink streaming sql via session api.
+ * You can capture the streaming output via SimpleMessageHandler
+ */
+public class FlinkAdvancedExample {
+ public static void main(String[] args) {
+
+ ZSession session = null;
+ try {
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+ Map<String, String> intpProperties = new HashMap<>();
+
+ session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("flink")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ // if MessageHandler is specified, then websocket is enabled.
+ // you can get continuous output from Zeppelin via websocket.
+ session.start(new SimpleMessageHandler());
+ System.out.println("Flink Web UI: " + session.getWeburl());
+
+ String code = "benv.fromElements(1,2,3,4,5,6,7,8,9,10).map(e=> {Thread.sleep(1000); e}).print()";
+ System.out.println("Submit code: " + code);
+ // use submit to run flink code in non-blocking way.
+ ExecuteResult result = session.submit(code);
+ System.out.println("Job status: " + result.getStatus());
+ while(!result.getStatus().isCompleted()) {
+ result = session.queryStatement(result.getStatementId());
+ System.out.println("Job status: " + result.getStatus() + ", progress: " + result.getProgress());
+ Thread.sleep(1000);
+ }
+ System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+ System.out.println("-----------------------------------------------------------------------------");
+ System.out.println("Submit code: " + code);
+ result = session.submit("benv.fromElements(1,2,3,4,5,6,7,8,9,10).map(e=> {Thread.sleep(1000); e}).print()");
+ System.out.println("Job status: " + result.getStatus());
+ result = session.waitUntilFinished(result.getStatementId());
+ System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+ System.out.println("-----------------------------------------------------------------------------");
+ code = "for(i <- 1 to 10) {\n" +
+ " Thread.sleep(1000)\n" +
+ " println(i)\n" +
+ "}";
+ System.out.println("Submit code: " + code);
+ result = session.execute(code);
+ System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+ System.out.println("-----------------------------------------------------------------------------");
+ String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
+ result = session.execute(initCode);
+ System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+ // run flink ssql
+ Map<String, String> localProperties = new HashMap<>();
+ localProperties.put("type", "update");
+ result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url");
+ session.waitUntilFinished(result.getStatementId());
+
+ result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url");
+ session.waitUntilRunning(result.getStatementId());
+ Thread.sleep(10 * 1000);
+ System.out.println("Try to cancel statement: " + result.getStatementId());
+ session.cancel(result.getStatementId());
+ session.waitUntilFinished(result.getStatementId());
+ System.out.println("Job status: " + result.getStatus());
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (session != null) {
+ try {
+ session.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample2.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample2.java
new file mode 100644
index 0000000..787bd13
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkAdvancedExample2.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.websocket.CompositeMessageHandler;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.StatementMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Advanced example of run flink streaming sql via session api.
+ * You can capture the streaming output via CompositeMessageHandler.
+ * You can specify StatementMessageHandler(MyStatementMessageHandler1, MyStatementMessageHandler2)
+ * for each flink job.
+ */
+public class FlinkAdvancedExample2 {
+ public static void main(String[] args) {
+
+ ZSession session = null;
+ try {
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+ Map<String, String> intpProperties = new HashMap<>();
+
+ session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("flink")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ // CompositeMessageHandler allow you to add StatementMessageHandler for each statement.
+ // otherwise you have to use a global MessageHandler.
+ session.start(new CompositeMessageHandler());
+ System.out.println("Flink Web UI: " + session.getWeburl());
+
+ System.out.println("-----------------------------------------------------------------------------");
+ String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
+ ExecuteResult result = session.execute(initCode);
+ System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+ // run flink ssql
+ Map<String, String> localProperties = new HashMap<>();
+ localProperties.put("type", "update");
+ result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",
+ new MyStatementMessageHandler1());
+ session.waitUntilFinished(result.getStatementId());
+
+ result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",
+ new MyStatementMessageHandler2());
+ session.waitUntilFinished(result.getStatementId());
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (session != null) {
+ try {
+ session.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ public static class MyStatementMessageHandler1 implements StatementMessageHandler {
+
+ @Override
+ public void onStatementAppendOutput(String statementId, int index, String output) {
+ System.out.println("MyStatementMessageHandler1, append output: " + output);
+ }
+
+ @Override
+ public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
+ System.out.println("MyStatementMessageHandler1, update output: " + output);
+ }
+ }
+
+ public static class MyStatementMessageHandler2 implements StatementMessageHandler {
+
+ @Override
+ public void onStatementAppendOutput(String statementId, int index, String output) {
+ System.out.println("MyStatementMessageHandler2, append output: " + output);
+ }
+
+ @Override
+ public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
+ System.out.println("MyStatementMessageHandler2, update output: " + output);
+ }
+ }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkExample.java
new file mode 100644
index 0000000..562156f
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/FlinkExample.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Basic example of run flink code (scala, sql, python) via session api.
+ */
+public class FlinkExample {
+ public static void main(String[] args) {
+
+ ZSession session = null;
+ try {
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+ Map<String, String> intpProperties = new HashMap<>();
+
+ session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("flink")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ session.start();
+ System.out.println("Flink Web UI: " + session.getWeburl());
+
+ // scala (single result)
+ ExecuteResult result = session.execute("benv.fromElements(1,2,3).print()");
+ System.out.println("Result: " + result.getResults().get(0).getData());
+
+ // scala (multiple result)
+ result = session.execute("val data = benv.fromElements(1,2,3).map(e=>(e, e * 2))\n" +
+ "data.print()\n" +
+ "z.show(data)");
+
+ // The first result is text output
+ System.out.println("Result 1: type: " + result.getResults().get(0).getType() +
+ ", data: " + result.getResults().get(0).getData() );
+ // The second result is table output
+ System.out.println("Result 2: type: " + result.getResults().get(1).getType() +
+ ", data: " + result.getResults().get(1).getData() );
+ System.out.println("Flink Job Urls:\n" + StringUtils.join(result.getJobUrls(), "\n"));
+
+ // error output
+ result = session.execute("1/0");
+ System.out.println("Result status: " + result.getStatus() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // pyflink
+ result = session.execute("pyflink", "type(b_env)");
+ System.out.println("benv: " + result.getResults().get(0).getData());
+ // matplotlib
+ result = session.execute("ipyflink", "%matplotlib inline\n" +
+ "import matplotlib.pyplot as plt\n" +
+ "plt.plot([1,2,3,4])\n" +
+ "plt.ylabel('some numbers')\n" +
+ "plt.show()");
+ System.out.println("Matplotlib result, type: " + result.getResults().get(0).getType() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // flink sql
+ result = session.execute("ssql", "show tables");
+ System.out.println("Flink tables: " + result.getResults().get(0).getData());
+
+ // flink invalid sql
+ result = session.execute("bsql", "select * from unknown_table");
+ System.out.println("Result status: " + result.getStatus() +
+ ", data: " + result.getResults().get(0).getData());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (session != null) {
+ try {
+ session.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/HiveExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/HiveExample.java
new file mode 100644
index 0000000..1e14a86
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/HiveExample.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Basic example of run hive sql via session api.
+ * And you can capture the job progress info via SimpleMessageHandler.
+ */
+public class HiveExample {
+
+ public static void main(String[] args) {
+
+ ZSession session = null;
+ try {
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+ Map<String, String> intpProperties = new HashMap<>();
+
+ session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("hive")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ session.start(new SimpleMessageHandler());
+
+ // single sql
+ ExecuteResult result = session.execute("show databases");
+ System.out.println("show database result : " + result.getResults().get(0).getData());
+
+ // multiple sql
+ result = session.execute("use tpch_text_5;\nshow tables");
+ System.out.println("show tables result: " + result.getResults().get(0).getData());
+
+ // select, you can see the hive sql job progress via SimpleMessageHandler
+ result = session.execute("select count(1) from lineitem");
+ System.out.println("Result status: " + result.getStatus() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // invalid sql
+ result = session.execute("select * from unknown_table");
+ System.out.println("Result status: " + result.getStatus() +
+ ", data: " + result.getResults().get(0).getData());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (session != null) {
+ try {
+ session.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PrestoExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PrestoExample.java
new file mode 100644
index 0000000..5373b00
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PrestoExample.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Basic example of run presto sql via session api.
+ */
+public class PrestoExample {
+
+ public static void main(String[] args) {
+
+ ZSession session = null;
+ try {
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+ Map<String, String> intpProperties = new HashMap<>();
+
+ session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("presto")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ session.start(new SimpleMessageHandler());
+
+ // single sql
+ ExecuteResult result = session.execute("show schemas");
+ System.out.println("show schemas result : " + result.getResults().get(0).getData());
+
+ // multiple sql
+ result = session.execute("use tpch_text_5;\nshow tables");
+ System.out.println("show tables result: " + result.getResults().get(0).getData());
+
+ // select
+ result = session.execute("select count(1) from lineitem");
+ System.out.println("Result status: " + result.getStatus() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // invalid sql
+ result = session.execute("select * from unknown_table");
+ System.out.println("Result status: " + result.getStatus() +
+ ", data: " + result.getResults().get(0).getData());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (session != null) {
+ try {
+ session.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PythonExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PythonExample.java
new file mode 100644
index 0000000..6a17ffb
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/PythonExample.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Basic example of run python code via session api.
+ */
+public class PythonExample {
+
+ public static void main(String[] args) {
+
+ ZSession session = null;
+ try {
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+ Map<String, String> intpProperties = new HashMap<>();
+
+ session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("python")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ session.start(new SimpleMessageHandler());
+
+ // single statement
+ ExecuteResult result = session.execute("print('hello world')");
+ System.out.println(result.getResults().get(0).getData());
+
+ // multiple statement
+ result = session.execute("print('hello world')\nprint('hello world2')");
+ System.out.println(result.getResults().get(0).getData());
+
+ // error output
+ result = session.execute("1/0");
+ System.out.println("Result status: " + result.getStatus() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // matplotlib
+ result = session.execute("ipython", "%matplotlib inline\n" +
+ "import matplotlib.pyplot as plt\n" +
+ "plt.plot([1,2,3,4])\n" +
+ "plt.ylabel('some numbers')\n" +
+ "plt.show()");
+ System.out.println("Matplotlib result, type: " + result.getResults().get(0).getType() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // show pandas dataframe
+ result = session.execute("ipython", "import pandas as pd\n" +
+ "df = pd.DataFrame({'name':['a','b','c'], 'count':[12,24,18]})\n" +
+ "z.show(df)");
+ System.out.println("Pandas dataframe result, type: " + result.getResults().get(0).getType() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // streaming output
+ result = session.execute("import time\n" +
+ "for i in range(1,10):\n" +
+ " print(i)\n" +
+ " time.sleep(1)");
+ System.out.println("Python streaming result, type: " + result.getResults().get(0).getType() +
+ ", data: " + result.getResults().get(0).getData());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (session != null) {
+ try {
+ session.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/RExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/RExample.java
new file mode 100644
index 0000000..1407374
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/RExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Basic example of run r code via session api.
+ */
+public class RExample {
+
+ public static void main(String[] args) {
+
+ ZSession session = null;
+ try {
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+ Map<String, String> intpProperties = new HashMap<>();
+
+ session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("r")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ session.start(new SimpleMessageHandler());
+
+ // single statement
+ ExecuteResult result = session.execute("bare <- c(1, 2.5, 4)\n" +
+ "print(bare)");
+ System.out.println(result.getResults().get(0).getData());
+
+ // error output
+ result = session.execute("1/0");
+ System.out.println("Result status: " + result.getStatus() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // R plotting
+ result = session.execute("ir", "pairs(iris)");
+ System.out.println("R plotting result, type: " + result.getResults().get(0).getType() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // ggplot2
+ result = session.execute("ir", "library(ggplot2)\n" +
+ "ggplot(mpg, aes(displ, hwy, colour = class)) + \n" +
+ " geom_point()");
+ System.out.println("ggplot2 plotting result, type: " + result.getResults().get(0).getType() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // googlevis
+ result = session.execute("ir", "library(googleVis)\n" +
+ "df=data.frame(country=c(\"US\", \"GB\", \"BR\"), \n" +
+ " val1=c(10,13,14), \n" +
+ " val2=c(23,12,32))\n" +
+ "Bar <- gvisBarChart(df)\n" +
+ "print(Bar, tag = 'chart')");
+ System.out.println("googlevis plotting result, type: " + result.getResults().get(0).getType() +
+ ", data: " + result.getResults().get(0).getData());
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (session != null) {
+ try {
+ session.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkAdvancedExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkAdvancedExample.java
new file mode 100644
index 0000000..ec0933f
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkAdvancedExample.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Advanced example of run spark code via session api.
+ */
+public class SparkAdvancedExample {
+
+ public static void main(String[] args) {
+
+ ZSession session = null;
+ try {
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+ Map<String, String> intpProperties = new HashMap<>();
+ intpProperties.put("spark.master", "local[*]");
+
+ session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("spark")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ // if MessageHandler is specified, then websocket is enabled.
+ // you can get continuous output from Zeppelin via websocket.
+ session.start(new SimpleMessageHandler());
+ System.out.println("Spark Web UI: " + session.getWeburl());
+
+ String code = "sc.range(1,10).map(e=> {Thread.sleep(2000); e}).sum()";
+ System.out.println("Submit code: " + code);
+ // use submit to run spark code in non-blocking way.
+ ExecuteResult result = session.submit(code);
+ System.out.println("Job status: " + result.getStatus());
+ while(!result.getStatus().isCompleted()) {
+ result = session.queryStatement(result.getStatementId());
+ System.out.println("Job status: " + result.getStatus() + ", progress: " + result.getProgress());
+ Thread.sleep(1000);
+ }
+ System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+ System.out.println("-----------------------------------------------------------------------------");
+ System.out.println("Submit code: " + code);
+ result = session.submit("sc.range(1,10).map(e=> {Thread.sleep(2000); e}).sum()");
+ System.out.println("Job status: " + result.getStatus());
+ result = session.waitUntilFinished(result.getStatementId());
+ System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+ System.out.println("-----------------------------------------------------------------------------");
+ System.out.println("Submit code: " + code);
+ result = session.submit("sc.range(1,10).map(e=> {Thread.sleep(2000); e}).sum()");
+ System.out.println("Job status: " + result.getStatus());
+ session.waitUntilRunning(result.getStatementId());
+ System.out.println("Try to cancel statement: " + result.getStatementId());
+ session.cancel(result.getStatementId());
+ result = session.waitUntilFinished(result.getStatementId());
+ System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+ System.out.println("-----------------------------------------------------------------------------");
+ code = "for(i <- 1 to 10) {\n" +
+ " Thread.sleep(1000)\n" +
+ " println(i)\n" +
+ "}";
+ System.out.println("Submit code: " + code);
+ result = session.execute(code);
+ System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (session != null) {
+ try {
+ session.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkExample.java
new file mode 100644
index 0000000..001c603
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/SparkExample.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.ZSession;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Basic example of run spark code (scala, sql, python, r) via session api.
+ */
+public class SparkExample {
+
+ public static void main(String[] args) {
+
+ ZSession session = null;
+ try {
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+ Map<String, String> intpProperties = new HashMap<>();
+ intpProperties.put("spark.master", "local[*]");
+
+ session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("spark")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ session.start();
+ System.out.println("Spark Web UI: " + session.getWeburl());
+
+ // scala (single result)
+ ExecuteResult result = session.execute("println(sc.version)");
+ System.out.println("Spark Version: " + result.getResults().get(0).getData());
+
+ // scala (multiple result)
+ result = session.execute("println(sc.version)\n" +
+ "val df = spark.createDataFrame(Seq((1,\"a\"), (2,\"b\")))\n" +
+ "z.show(df)");
+
+ // The first result is text output
+ System.out.println("Result 1: type: " + result.getResults().get(0).getType() +
+ ", data: " + result.getResults().get(0).getData() );
+ // The second result is table output
+ System.out.println("Result 2: type: " + result.getResults().get(1).getType() +
+ ", data: " + result.getResults().get(1).getData() );
+ System.out.println("Spark Job Urls:\n" + StringUtils.join(result.getJobUrls(), "\n"));
+
+ // error output
+ result = session.execute("1/0");
+ System.out.println("Result status: " + result.getStatus() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // pyspark
+ result = session.execute("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\n" +
+ "df.registerTempTable('df')\n" +
+ "df.show()");
+ System.out.println("PySpark dataframe: " + result.getResults().get(0).getData());
+
+ // matplotlib
+ result = session.execute("ipyspark", "%matplotlib inline\n" +
+ "import matplotlib.pyplot as plt\n" +
+ "plt.plot([1,2,3,4])\n" +
+ "plt.ylabel('some numbers')\n" +
+ "plt.show()");
+ System.out.println("Matplotlib result, type: " + result.getResults().get(0).getType() +
+ ", data: " + result.getResults().get(0).getData());
+
+ // sparkr
+ result = session.execute("r", "df <- as.DataFrame(faithful)\nhead(df)");
+ System.out.println("Sparkr dataframe: " + result.getResults().get(0).getData());
+
+ // spark sql
+ result = session.execute("sql", "select * from df");
+ System.out.println("Spark Sql dataframe: " + result.getResults().get(0).getData());
+
+ // spark invalid sql
+ result = session.execute("sql", "select * from unknown_table");
+ System.out.println("Result status: " + result.getStatus() +
+ ", data: " + result.getResults().get(0).getData());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (session != null) {
+ try {
+ session.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample.java
new file mode 100644
index 0000000..6dbae58
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.NoteResult;
+import org.apache.zeppelin.client.ParagraphResult;
+import org.apache.zeppelin.client.ZeppelinClient;
+
+
+/**
+ * Basic example of running zeppelin note/paragraph via ZeppelinClient (low level api)
+ */
+public class ZeppelinClientExample {
+
+ public static void main(String[] args) throws Exception {
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+ ZeppelinClient zClient = new ZeppelinClient(clientConfig);
+
+ String zeppelinVersion = zClient.getVersion();
+ System.out.println("Zeppelin version: " + zeppelinVersion);
+
+ String notePath = "/zeppelin_client_examples/note_1";
+ String noteId = null;
+ try {
+ noteId = zClient.createNote(notePath);
+ System.out.println("Created note: " + noteId);
+
+ String paragraphId = zClient.addParagraph(noteId, "the first paragraph", "%python print('hello world')");
+ ParagraphResult paragraphResult = zClient.executeParagraph(noteId, paragraphId);
+ System.out.println("Added new paragraph and execute it.");
+ System.out.println("Paragraph result: " + paragraphResult);
+
+ String paragraphId2 = zClient.addParagraph(noteId, "the second paragraph",
+ "%python\nimport time\ntime.sleep(5)\nprint('done')");
+ zClient.submitParagraph(noteId, paragraphId2);
+ zClient.waitUtilParagraphRunning(noteId, paragraphId2);
+ zClient.cancelParagraph(noteId, paragraphId2);
+ paragraphResult = zClient.waitUtilParagraphFinish(noteId, paragraphId2);
+ System.out.println("Added new paragraph, submit it then cancel it");
+ System.out.println("Paragraph result: " + paragraphResult);
+
+ NoteResult noteResult = zClient.executeNote(noteId);
+ System.out.println("Execute note and the note result: " + noteResult);
+
+ zClient.submitNote(noteId);
+ noteResult = zClient.waitUntilNoteFinished(noteId);
+ System.out.println("Submit note and the note result: " + noteResult);
+ } finally {
+ if (noteId != null) {
+ zClient.deleteNote(noteId);
+ System.out.println("Note " + noteId + " is deleted");
+ }
+ }
+ }
+}
diff --git a/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample2.java b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample2.java
new file mode 100644
index 0000000..05e183c
--- /dev/null
+++ b/zeppelin-client-examples/src/main/java/org/apache/zeppelin/client/examples/ZeppelinClientExample2.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.examples;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.NoteResult;
+import org.apache.zeppelin.client.ParagraphResult;
+import org.apache.zeppelin.client.ZeppelinClient;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Basic example of running existing note via ZeppelinClient (low level api)
+ *
+ */
+public class ZeppelinClientExample2 {
+
+ public static void main(String[] args) throws Exception {
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+ ZeppelinClient zClient = new ZeppelinClient(clientConfig);
+
+ String zeppelinVersion = zClient.getVersion();
+ System.out.println("Zeppelin version: " + zeppelinVersion);
+
+ // execute note 2A94M5J1Z paragraph by paragraph
+ try {
+ ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
+ System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);
+
+ paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
+ System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);
+
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("maxAge", "40");
+ paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
+ System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);
+
+ parameters = new HashMap<>();
+ parameters.put("marital", "married");
+ paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
+ System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);
+ } finally {
+ // you need to stop interpreter explicitly if you are running paragraph separately.
+ zClient.stopInterpreter("2A94M5J1Z", "spark");
+ }
+
+ // execute this whole note, this note will run under a didicated interpreter process which will be
+ // stopped after note execution.
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("maxAge", "40");
+ parameters.put("marital", "married");
+ NoteResult noteResult = zClient.executeNote("2A94M5J1Z", parameters);
+ System.out.println("Execute the spark tutorial note, note result: " + noteResult);
+ }
+}
diff --git a/zeppelin-client-examples/src/main/resources/init_stream.scala b/zeppelin-client-examples/src/main/resources/init_stream.scala
new file mode 100644
index 0000000..c81e18a
--- /dev/null
+++ b/zeppelin-client-examples/src/main/resources/init_stream.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
+import java.util.Collections
+import scala.collection.JavaConversions._
+
+senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+senv.enableCheckpointing(5000)
+
+val data = senv.addSource(new SourceFunction[(Long, String)] with ListCheckpointed[java.lang.Long] {
+
+ val pages = Seq("home", "search", "search", "product", "product", "product")
+ var count: Long = 0
+ var running : Boolean = true
+ // startTime is 2018/1/1
+ var startTime: Long = new java.util.Date(2018 - 1900,0,1).getTime
+ var sleepInterval = 500
+
+ override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = {
+ val lock = ctx.getCheckpointLock
+
+ while (count < 60 && running) {
+ lock.synchronized({
+ ctx.collect((startTime + count * sleepInterval, pages(count.toInt % pages.size)))
+ count += 1
+ Thread.sleep(sleepInterval)
+ })
+ }
+ }
+
+ override def cancel(): Unit = {
+ running = false
+ }
+
+ override def snapshotState(checkpointId: Long, timestamp: Long): java.util.List[java.lang.Long] = {
+ Collections.singletonList(count)
+ }
+
+ override def restoreState(state: java.util.List[java.lang.Long]): Unit = {
+ state.foreach(s => count = s)
+ }
+
+}).assignAscendingTimestamps(_._1)
+
+stenv.registerDataStream("log", data, 'time, 'url, 'rowtime.rowtime)
diff --git a/zeppelin-client-examples/src/main/resources/log4j.properties b/zeppelin-client-examples/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8daee59
--- /dev/null
+++ b/zeppelin-client-examples/src/main/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger = INFO, stdout
+
+log4j.appender.stdout = org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
diff --git a/zeppelin-client/pom.xml b/zeppelin-client/pom.xml
new file mode 100644
index 0000000..5fab859
--- /dev/null
+++ b/zeppelin-client/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zeppelin</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-client</artifactId>
+ <packaging>jar</packaging>
+ <version>0.9.0-SNAPSHOT</version>
+ <name>Zeppelin: Client</name>
+ <description>Zeppelin Client</description>
+
+ <properties>
+ <unirest.version>3.7.04</unirest.version>
+ <commons-text.version>1.8</commons-text.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-client</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.konghq</groupId>
+ <artifactId>unirest-java</artifactId>
+ <version>${unirest.version}</version>
+ <classifier>standalone</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-client</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-text</artifactId>
+ <version>${commons-text.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ClientConfig.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ClientConfig.java
new file mode 100644
index 0000000..527e033
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ClientConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client;
+
+/**
+ * Configuration of Zeppelin client, such as zeppelin server rest url and
+ * query interval of polling note/paragraph result.
+ */
+public class ClientConfig {
+ private String zeppelinRestUrl;
+ private long queryInterval ;
+ private boolean useKnox = false;
+
+ public ClientConfig(String zeppelinRestUrl) {
+ this(zeppelinRestUrl, 1000);
+ }
+
+ public ClientConfig(String zeppelinRestUrl, long queryInterval) {
+ this(zeppelinRestUrl, queryInterval, false);
+ }
+
+ public ClientConfig(String zeppelinRestUrl, long queryInterval, boolean useKnox) {
+ this.zeppelinRestUrl = removeTrailingSlash(zeppelinRestUrl);
+ this.queryInterval = queryInterval;
+ this.useKnox = useKnox;
+ }
+
+ private String removeTrailingSlash(String zeppelinRestUrl) {
+ if (zeppelinRestUrl.endsWith("/")) {
+ return zeppelinRestUrl.substring(0, zeppelinRestUrl.length() - 1);
+ } else {
+ return zeppelinRestUrl;
+ }
+ }
+
+ public String getZeppelinRestUrl() {
+ return zeppelinRestUrl;
+ }
+
+ public long getQueryInterval() {
+ return queryInterval;
+ }
+
+ public boolean isUseKnox() {
+ return useKnox;
+ }
+}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ExecuteResult.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ExecuteResult.java
new file mode 100644
index 0000000..c71bf06
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ExecuteResult.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+/**
+ * Execution result of each statement.
+ *
+ */
+public class ExecuteResult {
+
+ private String statementId;
+ private Status status;
+ // each statement may return multiple results
+ private List<Result> results;
+ // if there's any job in the statement, then it will also contain job urls.
+ // e.g. spark job url
+ private List<String> jobUrls;
+ // if there's any job in the statement, then it will also contain a progress
+ // range from 0 to 100. e.g. spark job progress.
+ private int progress;
+
+ public ExecuteResult(ParagraphResult paragraphResult) {
+ this.statementId = paragraphResult.getParagraphId();
+ this.status = paragraphResult.getStatus();
+ this.progress = paragraphResult.getProgress();
+ this.results = paragraphResult.getResults();
+ this.jobUrls = paragraphResult.getJobUrls();
+ }
+
+ public String getStatementId() {
+ return statementId;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public List<Result> getResults() {
+ return results;
+ }
+
+ public List<String> getJobUrls() {
+ return jobUrls;
+ }
+
+ public int getProgress() {
+ return progress;
+ }
+
+ @Override
+ public String toString() {
+ return "ExecuteResult{" +
+ "status=" + status +
+ ", progress=" + progress +
+ ", results=" + StringUtils.join(results, ", ") +
+ ", jobUrls=" + jobUrls +
+ '}';
+ }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/NoteResult.java
similarity index 52%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/NoteResult.java
index 8a6d0d0..b0cde11 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/NoteResult.java
@@ -14,32 +14,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.zeppelin.rest.message;
-import com.google.gson.Gson;
-import org.apache.zeppelin.common.JsonSerializable;
+package org.apache.zeppelin.client;
+
+import java.util.List;
/**
- * RestartInterpreter rest api request message.
+ * Represent the note execution result.
*/
-public class RestartInterpreterRequest implements JsonSerializable {
- private static final Gson gson = new Gson();
-
- String noteId;
-
- public RestartInterpreterRequest() {
+public class NoteResult {
+ private String noteId;
+ private boolean isRunning;
+ private List<ParagraphResult> paragraphResultList;
+
+ public NoteResult(String noteId, boolean isRunning, List<ParagraphResult> paragraphResultList) {
+ this.noteId = noteId;
+ this.isRunning = isRunning;
+ this.paragraphResultList = paragraphResultList;
}
public String getNoteId() {
return noteId;
}
- public String toJson() {
- return gson.toJson(this);
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ public List<ParagraphResult> getParagraphResultList() {
+ return paragraphResultList;
}
- public static RestartInterpreterRequest fromJson(String json) {
- return gson.fromJson(json, RestartInterpreterRequest.class);
+ @Override
+ public String toString() {
+ return "NoteResult{" +
+ "noteId='" + noteId + '\'' +
+ ", isRunning=" + isRunning +
+ ", paragraphResultList=" + paragraphResultList +
+ '}';
}
}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ParagraphResult.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ParagraphResult.java
new file mode 100644
index 0000000..a28c604
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ParagraphResult.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client;
+
+import kong.unirest.json.JSONArray;
+import kong.unirest.json.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * Represent the paragraph execution result.
+ *
+ */
+public class ParagraphResult {
+ private String paragraphId;
+ private Status status;
+ // if there's any job in the statement, then it will also contain a progress
+ // range from 0 to 100. e.g. spark job progress.
+ private int progress;
+ // each paragraph may return multiple results
+ private List<Result> results;
+ // if there's any job in the statement, then it will also contain job urls.
+ // e.g. spark job url
+ private List<String> jobUrls;
+
+ public ParagraphResult(JSONObject paragraphJson) {
+ this.paragraphId = paragraphJson.getString("id");
+ this.status = Status.valueOf(paragraphJson.getString("status"));
+ this.progress = paragraphJson.getInt("progress");
+ this.results = new ArrayList<>();
+ if (paragraphJson.has("results")) {
+ JSONObject resultJson = paragraphJson.getJSONObject("results");
+ JSONArray msgArray = resultJson.getJSONArray("msg");
+ for (int i = 0; i < msgArray.length(); ++i) {
+ JSONObject resultObject = msgArray.getJSONObject(i);
+ results.add(new Result(resultObject));
+ }
+ }
+
+ this.jobUrls = new ArrayList<>();
+ if (paragraphJson.has("runtimeInfos")) {
+ JSONObject runtimeInfosJson = paragraphJson.getJSONObject("runtimeInfos");
+ if (runtimeInfosJson.has("jobUrl")) {
+ JSONObject jobUrlJson = runtimeInfosJson.getJSONObject("jobUrl");
+ if (jobUrlJson.has("values")) {
+ JSONArray valuesArray = jobUrlJson.getJSONArray("values");
+ for (int i=0;i< valuesArray.length(); ++i) {
+ JSONObject object = valuesArray.getJSONObject(i);
+ if (object.has("jobUrl")) {
+ jobUrls.add(object.getString("jobUrl"));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public String getParagraphId() {
+ return paragraphId;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public int getProgress() {
+ return progress;
+ }
+
+ public List<Result> getResults() {
+ return results;
+ }
+
+ public List<String> getJobUrls() {
+ return jobUrls;
+ }
+
+ /**
+ * Get results in text format.
+ *
+ * @return
+ */
+ public String getResultInText() {
+ StringBuilder builder = new StringBuilder();
+ if (results != null) {
+ for (Result result : results) {
+ builder.append(result.getData() + "\n");
+ }
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public String toString() {
+ return "ParagraphResult{" +
+ "paragraphId='" + paragraphId + '\'' +
+ ", status=" + status +
+ ", results=" + StringUtils.join(results, ", ") +
+ ", progress=" + progress +
+ '}';
+ }
+
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/Result.java
similarity index 53%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/Result.java
index 9cf1df1..e1c5b90 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/Result.java
@@ -14,39 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.zeppelin.rest.message;
-import com.google.gson.Gson;
-import java.util.List;
+package org.apache.zeppelin.client;
-import org.apache.zeppelin.common.JsonSerializable;
+import kong.unirest.json.JSONObject;
/**
- * NewNoteRequest rest api request message.
+ * Represent one segment of result of paragraph. The result of paragraph could consists of
+ * multiple Results.
*/
-public class NewNoteRequest implements JsonSerializable {
- private static final Gson gson = new Gson();
+public class Result {
+ private String type;
+ private String data;
- String name;
- List<NewParagraphRequest> paragraphs;
-
- public NewNoteRequest (){
+ public Result(JSONObject jsonObject) {
+ this.type = jsonObject.getString("type");
+ this.data = jsonObject.getString("data");
}
- public String getName() {
- return name;
+ public Result(String type, String data) {
+ this.type = type;
+ this.data = data;
}
- public List<NewParagraphRequest> getParagraphs() {
- return paragraphs;
+ public String getType() {
+ return type;
}
- public String toJson() {
- return gson.toJson(this);
+ public String getData() {
+ return data;
}
- public static NewNoteRequest fromJson(String json) {
- return gson.fromJson(json, NewNoteRequest.class);
+ @Override
+ public String toString() {
+ return "Result{" +
+ "type='" + type + '\'' +
+ ", data='" + data + '\'' +
+ '}';
}
}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/SessionInfo.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/SessionInfo.java
new file mode 100644
index 0000000..4eac5c4
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/SessionInfo.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client;
+
+import kong.unirest.json.JSONObject;
+
+
+/**
+ * Represent each ZSession info.
+ */
+public class SessionInfo {
+
+ private String sessionId;
+ private String noteId;
+ private String interpreter;
+ private String state;
+ private String weburl;
+ private String startTime;
+
+ public SessionInfo(String sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public SessionInfo(JSONObject sessionJson) {
+ if (sessionJson.has("sessionId")) {
+ this.sessionId = sessionJson.getString("sessionId");
+ }
+ if (sessionJson.has("noteId")) {
+ this.noteId = sessionJson.getString("noteId");
+ }
+ if (sessionJson.has("interpreter")) {
+ this.interpreter = sessionJson.getString("interpreter");
+ }
+ if (sessionJson.has("state")) {
+ this.state = sessionJson.getString("state");
+ }
+ if (sessionJson.has("weburl")) {
+ this.weburl = sessionJson.getString("weburl");
+ }
+ if (sessionJson.has("startTime")) {
+ this.startTime = sessionJson.getString("startTime");
+ }
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public String getNoteId() {
+ return noteId;
+ }
+
+ public String getInterpreter() {
+ return interpreter;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public String getWeburl() {
+ return weburl;
+ }
+
+ public String getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public String toString() {
+ return "SessionResult{" +
+ "sessionId='" + sessionId + '\'' +
+ ", interpreter='" + interpreter + '\'' +
+ ", state='" + state + '\'' +
+ ", weburl='" + weburl + '\'' +
+ ", startTime='" + startTime + '\'' +
+ '}';
+ }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/Status.java
similarity index 54%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/Status.java
index 9cf1df1..a22a026 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/Status.java
@@ -14,39 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.zeppelin.rest.message;
-import com.google.gson.Gson;
-import java.util.List;
-
-import org.apache.zeppelin.common.JsonSerializable;
+package org.apache.zeppelin.client;
/**
- * NewNoteRequest rest api request message.
+ * Job status.
+ *
+ * UNKNOWN - Job is not found in remote
+ * READY - Job is not running, ready to run.
+ * PENDING - Job is submitted to scheduler. but not running yet
+ * RUNNING - Job is running.
+ * FINISHED - Job finished run. with success
+ * ERROR - Job finished run. with error
+ * ABORT - Job finished by abort
*/
-public class NewNoteRequest implements JsonSerializable {
- private static final Gson gson = new Gson();
-
- String name;
- List<NewParagraphRequest> paragraphs;
-
- public NewNoteRequest (){
- }
+public enum Status {
+ UNKNOWN, READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
- public String getName() {
- return name;
+ public boolean isReady() {
+ return this == READY;
}
- public List<NewParagraphRequest> getParagraphs() {
- return paragraphs;
+ public boolean isRunning() {
+ return this == RUNNING;
}
- public String toJson() {
- return gson.toJson(this);
+ public boolean isPending() {
+ return this == PENDING;
}
- public static NewNoteRequest fromJson(String json) {
- return gson.fromJson(json, NewNoteRequest.class);
+ public boolean isCompleted() {
+ return this == FINISHED || this == ERROR || this == ABORT;
}
}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java
new file mode 100644
index 0000000..ffedf40
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.client.websocket.Message;
+import org.apache.zeppelin.client.websocket.MessageHandler;
+import org.apache.zeppelin.client.websocket.StatementMessageHandler;
+import org.apache.zeppelin.client.websocket.ZeppelinWebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Each ZSession represent one interpreter process, you can start/stop it, and execute/submit/cancel code.
+ * There's no Zeppelin concept(like note/paragraph) in ZSession.
+ *
+ */
+public class ZSession {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ZSession.class);
+
+ private ZeppelinClient zeppelinClient;
+ private String interpreter;
+ private Map<String, String> intpProperties;
+ // max number of retained statements, each statement represent one paragraph.
+ private int maxStatement;
+
+ private SessionInfo sessionInfo;
+
+ private ZeppelinWebSocketClient webSocketClient;
+
+ public ZSession(ClientConfig clientConfig,
+ String interpreter) throws Exception {
+ this(clientConfig, interpreter, new HashMap<>(), 100);
+ }
+
+ public ZSession(ClientConfig clientConfig,
+ String interpreter,
+ Map<String, String> intpProperties,
+ int maxStatement) throws Exception {
+ this.zeppelinClient = new ZeppelinClient(clientConfig);
+ this.interpreter = interpreter;
+ this.intpProperties = intpProperties;
+ this.maxStatement = maxStatement;
+ }
+
+ private ZSession(ClientConfig clientConfig,
+ String interpreter,
+ String sessionId) throws Exception {
+ this.zeppelinClient = new ZeppelinClient(clientConfig);
+ this.interpreter = interpreter;
+ this.sessionInfo = new SessionInfo(sessionId);
+ }
+
+ /**
+ * Start this ZSession, underneath it would create a note for this ZSession and
+ * start a dedicated interpreter group.
+ * This method won't establish websocket connection.
+ *
+ * @throws Exception
+ */
+ public void start() throws Exception {
+ start(null);
+ }
+
+ /**
+ * Start this ZSession, underneath it would create a note for this ZSession and
+ * start a dedicated interpreter process.
+ *
+ * @param messageHandler
+ * @throws Exception
+ */
+ public void start(MessageHandler messageHandler) throws Exception {
+ this.sessionInfo = zeppelinClient.newSession(interpreter);
+
+ // inline configuration
+ StringBuilder builder = new StringBuilder("%" + interpreter + ".conf\n");
+ if (intpProperties != null) {
+ for (Map.Entry<String, String> entry : intpProperties.entrySet()) {
+ builder.append(entry.getKey() + " " + entry.getValue() + "\n");
+ }
+ }
+ String paragraphId = zeppelinClient.addParagraph(getNoteId(), "Session Configuration", builder.toString());
+ ParagraphResult paragraphResult = zeppelinClient.executeParagraph(getNoteId(), paragraphId, getSessionId());
+ if (paragraphResult.getStatus() != Status.FINISHED) {
+ throw new Exception("Fail to configure session, " + paragraphResult.getResultInText());
+ }
+
+ // start session
+ // add local properties (init) to avoid skip empty paragraph.
+ paragraphId = zeppelinClient.addParagraph(getNoteId(), "Session Init", "%" + interpreter + "(init=true)");
+ paragraphResult = zeppelinClient.executeParagraph(getNoteId(), paragraphId, getSessionId());
+ if (paragraphResult.getStatus() != Status.FINISHED) {
+ throw new Exception("Fail to init session, " + paragraphResult.getResultInText());
+ }
+ this.sessionInfo = zeppelinClient.getSession(getSessionId());
+
+ if (messageHandler != null) {
+ this.webSocketClient = new ZeppelinWebSocketClient(messageHandler);
+ this.webSocketClient.connect(zeppelinClient.getClientConfig().getZeppelinRestUrl()
+ .replace("https", "ws").replace("http", "ws") + "/ws");
+
+ // call GET_NOTE to establish websocket connection between this session and zeppelin-server
+ Message msg = new Message(Message.OP.GET_NOTE);
+ msg.put("id", getNoteId());
+ this.webSocketClient.send(msg);
+ }
+ }
+
+ /**
+ * Stop this underlying interpreter process.
+ *
+ * @throws Exception
+ */
+ public void stop() throws Exception {
+ if (getSessionId() != null) {
+ zeppelinClient.stopSession(getSessionId());
+ }
+ if (webSocketClient != null) {
+ webSocketClient.stop();
+ }
+ }
+
+ /**
+ * Session has been started in ZeppelinServer, this method is just to reconnect it.
+ * This method is used for connect to an existing session in ZeppelinServer, instead of
+ * start it from ZSession.
+ * @throws Exception
+ */
+ public static ZSession createFromExistingSession(ClientConfig clientConfig,
+ String interpreter,
+ String sessionId) throws Exception {
+ return createFromExistingSession(clientConfig, interpreter, sessionId, null);
+ }
+
+ /**
+ * Session has been started in ZeppelinServer, this method is just to reconnect it.
+ * This method is used for connect to an existing session in ZeppelinServer, instead of
+ * start it from ZSession.
+ * @throws Exception
+ */
+ public static ZSession createFromExistingSession(ClientConfig clientConfig,
+ String interpreter,
+ String sessionId,
+ MessageHandler messageHandler) throws Exception {
+ ZSession session = new ZSession(clientConfig, interpreter, sessionId);
+ session.reconnect(messageHandler);
+ return session;
+ }
+
+ private void reconnect(MessageHandler messageHandler) throws Exception {
+ this.sessionInfo = this.zeppelinClient.getSession(getSessionId());
+ if (!sessionInfo.getState().equalsIgnoreCase("Running")) {
+ throw new Exception("Session " + getSessionId() + " is not running, state: " + sessionInfo.getState());
+ }
+
+ if (messageHandler != null) {
+ this.webSocketClient = new ZeppelinWebSocketClient(messageHandler);
+ this.webSocketClient.connect(zeppelinClient.getClientConfig().getZeppelinRestUrl()
+ .replace("https", "ws").replace("http", "ws") + "/ws");
+
+ // call GET_NOTE to establish websocket connection between this session and zeppelin-server
+ Message msg = new Message(Message.OP.GET_NOTE);
+ msg.put("id", getNoteId());
+ this.webSocketClient.send(msg);
+ }
+ }
+
+ /**
+ * Run code in non-blocking way.
+ *
+ * @param code
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult execute(String code) throws Exception {
+ return execute("", code);
+ }
+
+ /**
+ * Run code in non-blocking way.
+ *
+ * @param code
+ * @param messageHandler
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult execute(String code, StatementMessageHandler messageHandler) throws Exception {
+ return execute("", code, messageHandler);
+ }
+
+ /**
+ *
+ * @param subInterpreter
+ * @param code
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult execute(String subInterpreter, String code) throws Exception {
+ return execute(subInterpreter, new HashMap<>(), code);
+ }
+
+ /**
+ *
+ * @param subInterpreter
+ * @param code
+ * @param messageHandler
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult execute(String subInterpreter, String code, StatementMessageHandler messageHandler) throws Exception {
+ return execute(subInterpreter, new HashMap<>(), code, messageHandler);
+ }
+
+ /**
+ *
+ * @param subInterpreter
+ * @param localProperties
+ * @param code
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult execute(String subInterpreter,
+ Map<String, String> localProperties,
+ String code) throws Exception {
+ return execute(subInterpreter, localProperties, code, null);
+ }
+
+ /**
+ *
+ * @param subInterpreter
+ * @param localProperties
+ * @param code
+ * @param messageHandler
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult execute(String subInterpreter,
+ Map<String, String> localProperties,
+ String code,
+ StatementMessageHandler messageHandler) throws Exception {
+ StringBuilder builder = new StringBuilder("%" + interpreter);
+ if (!StringUtils.isBlank(subInterpreter)) {
+ builder.append("." + subInterpreter);
+ }
+ if (localProperties != null && !localProperties.isEmpty()) {
+ builder.append("(");
+ for (Map.Entry<String, String> entry : localProperties.entrySet()) {
+ builder.append(entry.getKey() + "=\"" + entry.getValue() + "\"");
+ }
+ builder.append(")");
+ }
+ builder.append("\n" + code);
+
+ String text = builder.toString();
+
+ String nextParagraphId = zeppelinClient.nextSessionParagraph(getNoteId(), maxStatement);
+ zeppelinClient.updateParagraph(getNoteId(), nextParagraphId, "", text);
+
+ if (messageHandler != null) {
+ webSocketClient.addStatementMessageHandler(nextParagraphId, messageHandler);
+ }
+ ParagraphResult paragraphResult = zeppelinClient.executeParagraph(getNoteId(), nextParagraphId, getSessionId());
+ return new ExecuteResult(paragraphResult);
+ }
+
+ /**
+ *
+ * @param code
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult submit(String code) throws Exception {
+ return submit("", code);
+ }
+
+ /**
+ *
+ * @param code
+ * @param messageHandler
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult submit(String code, StatementMessageHandler messageHandler) throws Exception {
+ return submit("", code, messageHandler);
+ }
+
+ /**
+ *
+ * @param subInterpreter
+ * @param code
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult submit(String subInterpreter, String code) throws Exception {
+ return submit(subInterpreter, new HashMap<>(), code);
+ }
+
+ /**
+ *
+ * @param subInterpreter
+ * @param code
+ * @param messageHandler
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult submit(String subInterpreter, String code, StatementMessageHandler messageHandler) throws Exception {
+ return submit(subInterpreter, new HashMap<>(), code, messageHandler);
+ }
+
+ /**
+ *
+ * @param subInterpreter
+ * @param code
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult submit(String subInterpreter, Map<String, String> localProperties, String code) throws Exception {
+ return submit(subInterpreter, localProperties, code, null);
+ }
+
+ /**
+ *
+ * @param subInterpreter
+ * @param code
+ * @param messageHandler
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult submit(String subInterpreter,
+ Map<String, String> localProperties,
+ String code,
+ StatementMessageHandler messageHandler) throws Exception {
+ StringBuilder builder = new StringBuilder("%" + interpreter);
+ if (!StringUtils.isBlank(subInterpreter)) {
+ builder.append("." + subInterpreter);
+ }
+ if (localProperties != null && !localProperties.isEmpty()) {
+ builder.append("(");
+ for (Map.Entry<String, String> entry : localProperties.entrySet()) {
+ builder.append(entry.getKey() + "=\"" + entry.getValue() + "\"");
+ }
+ builder.append(")");
+ }
+ builder.append("\n" + code);
+
+ String text = builder.toString();
+ String nextParagraphId = zeppelinClient.nextSessionParagraph(getNoteId(), maxStatement);
+ zeppelinClient.updateParagraph(getNoteId(), nextParagraphId, "", text);
+ if (messageHandler != null) {
+ webSocketClient.addStatementMessageHandler(nextParagraphId, messageHandler);
+ }
+ ParagraphResult paragraphResult = zeppelinClient.submitParagraph(getNoteId(), nextParagraphId, getSessionId());
+ return new ExecuteResult(paragraphResult);
+ }
+
+ /**
+ *
+ * @param statementId
+ * @throws Exception
+ */
+ public void cancel(String statementId) throws Exception {
+ zeppelinClient.cancelParagraph(getNoteId(), statementId);
+ }
+
+ /**
+ *
+ * @param statementId
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult queryStatement(String statementId) throws Exception {
+ ParagraphResult paragraphResult = zeppelinClient.queryParagraphResult(getNoteId(), statementId);
+ return new ExecuteResult(paragraphResult);
+ }
+
+ /**
+ *
+ * @param statementId
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult waitUntilFinished(String statementId) throws Exception {
+ ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphFinish(getNoteId(), statementId);
+ return new ExecuteResult(paragraphResult);
+ }
+
+ /**
+ *
+ * @param statementId
+ * @return
+ * @throws Exception
+ */
+ public ExecuteResult waitUntilRunning(String statementId) throws Exception {
+ ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphRunning(getNoteId(), statementId);
+ return new ExecuteResult(paragraphResult);
+ }
+
+ public String getNoteId() {
+ if (this.sessionInfo != null) {
+ return this.sessionInfo.getNoteId();
+ } else {
+ return null;
+ }
+ }
+
+ public String getWeburl() {
+ if (this.sessionInfo != null) {
+ return sessionInfo.getWeburl();
+ } else {
+ return null;
+ }
+ }
+
+ public String getSessionId() {
+ if (this.sessionInfo != null) {
+ return this.sessionInfo.getSessionId();
+ } else {
+ return null;
+ }
+ }
+
+ public String getInterpreter() {
+ return interpreter;
+ }
+
+ public ZeppelinClient getZeppelinClient() {
+ return zeppelinClient;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private ClientConfig clientConfig;
+ private String interpreter;
+ private Map<String, String> intpProperties;
+ private int maxStatement = 100;
+
+ public Builder setClientConfig(ClientConfig clientConfig) {
+ this.clientConfig = clientConfig;
+ return this;
+ }
+
+ public Builder setInterpreter(String interpreter) {
+ this.interpreter = interpreter;
+ return this;
+ }
+
+ public Builder setIntpProperties(Map<String, String> intpProperties) {
+ this.intpProperties = intpProperties;
+ return this;
+ }
+
+ public ZSession build() throws Exception {
+ return new ZSession(clientConfig, interpreter, intpProperties, maxStatement);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ ClientConfig clientConfig = new ClientConfig("http://localhost:8080", 1000);
+// ZSession hiveSession = new ZSession(clientConfig, "hive", new HashMap<>(), 100);
+// hiveSession.start();
+//
+// ExecuteResult executeResult = hiveSession.submit("show tables");
+// executeResult = hiveSession.waitUntilFinished(executeResult.getStatementId());
+// System.out.println(executeResult.toString());
+//
+// executeResult = hiveSession.submit("select eid, count(1) from employee group by eid");
+// executeResult = hiveSession.waitUntilFinished(executeResult.getStatementId());
+// System.out.println(executeResult.toString());
+
+ ZSession sparkSession = ZSession.createFromExistingSession(clientConfig, "hive", "hive_1598418780469");
+ ExecuteResult executeResult = sparkSession.execute("show databases");
+ System.out.println(executeResult);
+
+// ExecuteResult executeResult = sparkSession.submit("sql", "show tables");
+// executeResult = sparkSession.waitUntilFinished(executeResult.getStatementId());
+// System.out.println(executeResult.toString());
+//
+// executeResult = sparkSession.submit("sql", "select eid, count(1) from employee group by eid");
+// executeResult = sparkSession.waitUntilFinished(executeResult.getStatementId());
+// System.out.println(executeResult.toString());
+ }
+}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZeppelinClient.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZeppelinClient.java
new file mode 100644
index 0000000..9bb5265
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZeppelinClient.java
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.client;
+
+import kong.unirest.GetRequest;
+import kong.unirest.HttpResponse;
+import kong.unirest.JsonNode;
+import kong.unirest.Unirest;
+import kong.unirest.apache.ApacheClient;
+import kong.unirest.json.JSONArray;
+import kong.unirest.json.JSONObject;
+import org.apache.commons.text.StringEscapeUtils;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import unirest.shaded.org.apache.http.client.HttpClient;
+import unirest.shaded.org.apache.http.impl.client.HttpClients;
+
+import javax.net.ssl.SSLContext;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Low level api for interacting with Zeppelin. Underneath, it use the zeppelin rest api.
+ * You can use this class to operate Zeppelin note/paragraph,
+ * e.g. get/add/delete/update/execute/cancel
+ */
+public class ZeppelinClient {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinClient.class);
+
+ private ClientConfig clientConfig;
+
+ public ZeppelinClient(ClientConfig clientConfig) throws Exception {
+ this.clientConfig = clientConfig;
+ Unirest.config().defaultBaseUrl(clientConfig.getZeppelinRestUrl() + "/api");
+
+ if (clientConfig.isUseKnox()) {
+ try {
+ SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustSelfSignedStrategy() {
+ public boolean isTrusted(X509Certificate[] chain, String authType) {
+ return true;
+ }
+ }).build();
+ HttpClient customHttpClient = HttpClients.custom().setSSLContext(sslContext)
+ .setSSLHostnameVerifier(new NoopHostnameVerifier()).build();
+ Unirest.config().httpClient(ApacheClient.builder(customHttpClient));
+ } catch (Exception e) {
+ throw new Exception("Fail to setup httpclient of Unirest", e);
+ }
+ }
+ }
+
+ public ClientConfig getClientConfig() {
+ return clientConfig;
+ }
+
+ /**
+ * Throw exception if the status code is not 200.
+ *
+ * @param response
+ * @throws Exception
+ */
+ private void checkResponse(HttpResponse<JsonNode> response) throws Exception {
+ if (response.getStatus() == 302) {
+ throw new Exception("Please login first");
+ }
+ if (response.getStatus() != 200) {
+ throw new Exception(String.format("Unable to call rest api, status: %s, statusText: %s, message: %s",
+ response.getStatus(),
+ response.getStatusText(),
+ response.getBody().getObject().getString("message")));
+ }
+ }
+
+ /**
+ * Throw exception if the status in the json object is not `OK`.
+ *
+ * @param jsonNode
+ * @throws Exception
+ */
+ private void checkJsonNodeStatus(JsonNode jsonNode) throws Exception {
+ if (! "OK".equalsIgnoreCase(jsonNode.getObject().getString("status"))) {
+ throw new Exception(StringEscapeUtils.unescapeJava(jsonNode.getObject().getString("message")));
+ }
+ }
+
+ /**
+ * Get Zeppelin version.
+ *
+ * @return
+ * @throws Exception
+ */
+ public String getVersion() throws Exception {
+ HttpResponse<JsonNode> response = Unirest
+ .get("/version")
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+ return jsonNode.getObject().getJSONObject("body").getString("version");
+ }
+
+ /**
+ * Request a new session id. It doesn't create session (interpreter process) in zeppelin server side, but just
+ * create an unique session id.
+ *
+ * @param interpreter
+ * @return
+ * @throws Exception
+ */
+ public SessionInfo newSession(String interpreter) throws Exception {
+ HttpResponse<JsonNode> response = Unirest
+ .post("/session")
+ .queryString("interpreter", interpreter)
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+ return new SessionInfo(jsonNode.getObject().getJSONObject("body"));
+ }
+
+ /**
+ * Stop the session(interpreter process) in Zeppelin server.
+ *
+ * @param sessionId
+ * @throws Exception
+ */
+ public void stopSession(String sessionId) throws Exception {
+ HttpResponse<JsonNode> response = Unirest
+ .delete("/session/{sessionId}")
+ .routeParam("sessionId", sessionId)
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+ }
+
+ /**
+ * Get session info for the provided sessionId.
+ *
+ * @param sessionId
+ * @throws Exception
+ */
+ public SessionInfo getSession(String sessionId) throws Exception {
+ HttpResponse<JsonNode> response = Unirest
+ .get("/session/{sessionId}")
+ .routeParam("sessionId", sessionId)
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+
+ JSONObject bodyObject = jsonNode.getObject().getJSONObject("body");
+ return new SessionInfo(bodyObject);
+ }
+
+ /**
+ * List all the sessions.
+ *
+ * @return
+ * @throws Exception
+ */
+ public List<SessionInfo> listSessions() throws Exception {
+ return listSessions(null);
+ }
+
+ /**
+ * List all the sessions for the provided interpreter.
+ *
+ * @param interpreter
+ * @return
+ * @throws Exception
+ */
+ public List<SessionInfo> listSessions(String interpreter) throws Exception {
+ GetRequest getRequest = Unirest.get("/session");
+ if (interpreter != null) {
+ getRequest.queryString("interpreter", interpreter);
+ }
+ HttpResponse<JsonNode> response = getRequest.asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+ JSONArray sessionJsonArray = jsonNode.getObject().getJSONArray("body");
+ List<SessionInfo> sessionInfos = new ArrayList<>();
+ for (int i = 0; i< sessionJsonArray.length();++i) {
+ sessionInfos.add(new SessionInfo(sessionJsonArray.getJSONObject(i)));
+ }
+ return sessionInfos;
+ }
+
+ /**
+ * Login zeppelin with userName and password, throw exception if login fails.
+ *
+ * @param userName
+ * @param password
+ * @throws Exception
+ */
+ public void login(String userName, String password) throws Exception {
+ if (clientConfig.isUseKnox()) {
+ HttpResponse<String> response = Unirest.get("/")
+ .basicAuth(userName, password)
+ .asString();
+ if (response.getStatus() != 200) {
+ throw new Exception(String.format("Login failed, status: %s, statusText: %s",
+ response.getStatus(),
+ response.getStatusText()));
+ }
+ } else {
+ HttpResponse<JsonNode> response = Unirest
+ .post("/login")
+ .field("userName", userName)
+ .field("password", password)
+ .asJson();
+ if (response.getStatus() != 200) {
+ throw new Exception(String.format("Login failed, status: %s, statusText: %s",
+ response.getStatus(),
+ response.getStatusText()));
+ }
+ }
+ }
+
+ public String createNote(String notePath) throws Exception {
+ return createNote(notePath, "");
+ }
+
+ /**
+ * Create a new empty note with provided notePath and defaultInterpreterGroup
+ *
+ * @param notePath
+ * @param defaultInterpreterGroup
+ * @return
+ * @throws Exception
+ */
+ public String createNote(String notePath, String defaultInterpreterGroup) throws Exception {
+ JSONObject bodyObject = new JSONObject();
+ bodyObject.put("name", notePath);
+ bodyObject.put("defaultInterpreterGroup", defaultInterpreterGroup);
+ HttpResponse<JsonNode> response = Unirest
+ .post("/notebook")
+ .body(bodyObject.toString())
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+
+ return jsonNode.getObject().getString("body");
+ }
+
+ /**
+ * Delete note with provided noteId.
+ *
+ * @param noteId
+ * @throws Exception
+ */
+ public void deleteNote(String noteId) throws Exception {
+ HttpResponse<JsonNode> response = Unirest
+ .delete("/notebook/{noteId}")
+ .routeParam("noteId", noteId)
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+ }
+
+ /**
+ * Query {@link NoteResult} with provided noteId.
+ *
+ * @param noteId
+ * @return
+ * @throws Exception
+ */
+ public NoteResult queryNoteResult(String noteId) throws Exception {
+ HttpResponse<JsonNode> response = Unirest
+ .get("/notebook/{noteId}")
+ .routeParam("noteId", noteId)
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+
+ JSONObject noteJsonObject = jsonNode.getObject().getJSONObject("body");
+ boolean isRunning = false;
+ if (noteJsonObject.has("info")) {
+ JSONObject infoJsonObject = noteJsonObject.getJSONObject("info");
+ if (infoJsonObject.has("isRunning")) {
+ isRunning = Boolean.parseBoolean(infoJsonObject.getString("isRunning"));
+ }
+ }
+
+ List<ParagraphResult> paragraphResultList = new ArrayList<>();
+ if (noteJsonObject.has("paragraphs")) {
+ JSONArray paragraphJsonArray = noteJsonObject.getJSONArray("paragraphs");
+ for (int i = 0; i< paragraphJsonArray.length(); ++i) {
+ paragraphResultList.add(new ParagraphResult(paragraphJsonArray.getJSONObject(i)));
+ }
+ }
+
+ return new NoteResult(noteId, isRunning, paragraphResultList);
+ }
+
+ /**
+ * Execute note with provided noteId, return until note execution is completed.
+ * Interpreter process will be stopped after note execution.
+ *
+ * @param noteId
+ * @return
+ * @throws Exception
+ */
+ public NoteResult executeNote(String noteId) throws Exception {
+ return executeNote(noteId, new HashMap<>());
+ }
+
+ /**
+ * Execute note with provided noteId and parameters, return until note execution is completed.
+ * Interpreter process will be stopped after note execution.
+ *
+ * @param noteId
+ * @param parameters
+ * @return
+ * @throws Exception
+ */
+ public NoteResult executeNote(String noteId, Map<String, String> parameters) throws Exception {
+ submitNote(noteId, parameters);
+ return waitUntilNoteFinished(noteId);
+ }
+
+ /**
+ * Submit note to execute with provided noteId, return at once the submission is completed.
+ * You need to query {@link NoteResult} by yourself afterwards until note execution is completed.
+ * Interpreter process will be stopped after note execution.
+ *
+ * @param noteId
+ * @return
+ * @throws Exception
+ */
+ public NoteResult submitNote(String noteId) throws Exception {
+ return submitNote(noteId, new HashMap<>());
+ }
+
+ /**
+ * Submit note to execute with provided noteId and parameters, return at once the submission is completed.
+ * You need to query {@link NoteResult} by yourself afterwards until note execution is completed.
+ * Interpreter process will be stopped after note execution.
+ *
+ * @param noteId
+ * @param parameters
+ * @return
+ * @throws Exception
+ */
+ public NoteResult submitNote(String noteId, Map<String, String> parameters) throws Exception {
+ JSONObject bodyObject = new JSONObject();
+ bodyObject.put("params", parameters);
+ // run note in non-blocking and isolated way.
+ HttpResponse<JsonNode> response = Unirest
+ .post("/notebook/job/{noteId}")
+ .routeParam("noteId", noteId)
+ .queryString("blocking", "false")
+ .queryString("isolated", "true")
+ .body(bodyObject)
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+ return queryNoteResult(noteId);
+ }
+
+ /**
+ * Block there until note execution is completed.
+ *
+ * @param noteId
+ * @return
+ * @throws Exception
+ */
+ public NoteResult waitUntilNoteFinished(String noteId) throws Exception {
+ while (true) {
+ NoteResult noteResult = queryNoteResult(noteId);
+ if (!noteResult.isRunning()) {
+ return noteResult;
+ }
+ Thread.sleep(clientConfig.getQueryInterval());
+ }
+ }
+
+ /**
+ * Block there until note execution is completed, and throw exception if note execution is not completed
+ * in <code>timeoutInMills</code>.
+ *
+ * @param noteId
+ * @param timeoutInMills
+ * @return
+ * @throws Exception
+ */
+ public NoteResult waitUntilNoteFinished(String noteId, long timeoutInMills) throws Exception {
+ long start = System.currentTimeMillis();
+ while (true && (System.currentTimeMillis() - start) < timeoutInMills) {
+ NoteResult noteResult = queryNoteResult(noteId);
+ if (!noteResult.isRunning()) {
+ return noteResult;
+ }
+ Thread.sleep(clientConfig.getQueryInterval());
+ }
+ throw new Exception("Note is not finished in " + timeoutInMills / 1000 + " seconds");
+ }
+
+ /**
+ * Add paragraph to note with provided title and text.
+ *
+ * @param noteId
+ * @param title
+ * @param text
+ * @return
+ * @throws Exception
+ */
+ public String addParagraph(String noteId, String title, String text) throws Exception {
+ JSONObject bodyObject = new JSONObject();
+ bodyObject.put("title", title);
+ bodyObject.put("text", text);
+ HttpResponse<JsonNode> response = Unirest.post("/notebook/{noteId}/paragraph")
+ .routeParam("noteId", noteId)
+ .body(bodyObject.toString())
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+
+ return jsonNode.getObject().getString("body");
+ }
+
+ /**
+ * Update paragraph with specified title and text.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @param title
+ * @param text
+ * @throws Exception
+ */
+ public void updateParagraph(String noteId, String paragraphId, String title, String text) throws Exception {
+ JSONObject bodyObject = new JSONObject();
+ bodyObject.put("title", title);
+ bodyObject.put("text", text);
+ HttpResponse<JsonNode> response = Unirest.put("/notebook/{noteId}/paragraph/{paragraphId}")
+ .routeParam("noteId", noteId)
+ .routeParam("paragraphId", paragraphId)
+ .body(bodyObject.toString())
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+ }
+
+ /**
+ * Execute paragraph with parameters in specified session. If sessionId is null or empty string, then it depends on
+ * the interpreter binding mode of Note(e.g. isolated per note), otherwise it will run in the specified session.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @param sessionId
+ * @param parameters
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult executeParagraph(String noteId,
+ String paragraphId,
+ String sessionId,
+ Map<String, String> parameters) throws Exception {
+ submitParagraph(noteId, paragraphId, sessionId, parameters);
+ return waitUtilParagraphFinish(noteId, paragraphId);
+ }
+
+ /**
+ * Execute paragraph with parameters.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @param parameters
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult executeParagraph(String noteId,
+ String paragraphId,
+ Map<String, String> parameters) throws Exception {
+ return executeParagraph(noteId, paragraphId, "", parameters);
+ }
+
+ /**
+ * Execute paragraph in specified session. If sessionId is null or empty string, then it depends on
+ * the interpreter binding mode of Note (e.g. isolated per note), otherwise it will run in the specified session.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @param sessionId
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult executeParagraph(String noteId,
+ String paragraphId,
+ String sessionId) throws Exception {
+ return executeParagraph(noteId, paragraphId, sessionId, new HashMap<>());
+ }
+
+ /**
+ * Execute paragraph.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult executeParagraph(String noteId, String paragraphId) throws Exception {
+ return executeParagraph(noteId, paragraphId, "", new HashMap<>());
+ }
+
+ /**
+ * Submit paragraph to execute with provided parameters and sessionId. Return at once the submission is completed.
+ * You need to query {@link ParagraphResult} by yourself afterwards until paragraph execution is completed.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @param sessionId
+ * @param parameters
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult submitParagraph(String noteId,
+ String paragraphId,
+ String sessionId,
+ Map<String, String> parameters) throws Exception {
+ JSONObject bodyObject = new JSONObject();
+ bodyObject.put("params", parameters);
+ HttpResponse<JsonNode> response = Unirest
+ .post("/notebook/job/{noteId}/{paragraphId}")
+ .routeParam("noteId", noteId)
+ .routeParam("paragraphId", paragraphId)
+ .queryString("sessionId", sessionId)
+ .body(bodyObject.toString())
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+ return queryParagraphResult(noteId, paragraphId);
+ }
+
+ /**
+ * Submit paragraph to execute with provided sessionId. Return at once the submission is completed.
+ * You need to query {@link ParagraphResult} by yourself afterwards until paragraph execution is completed.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @param sessionId
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult submitParagraph(String noteId,
+ String paragraphId,
+ String sessionId) throws Exception {
+ return submitParagraph(noteId, paragraphId, sessionId, new HashMap<>());
+ }
+
+ /**
+ * Submit paragraph to execute with provided parameters. Return at once the submission is completed.
+ * You need to query {@link ParagraphResult} by yourself afterwards until paragraph execution is completed.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @param parameters
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult submitParagraph(String noteId,
+ String paragraphId,
+ Map<String, String> parameters) throws Exception {
+ return submitParagraph(noteId, paragraphId, "", parameters);
+ }
+
+ /**
+ * Submit paragraph to execute. Return at once the submission is completed.
+ * You need to query {@link ParagraphResult} by yourself afterwards until paragraph execution is completed.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult submitParagraph(String noteId, String paragraphId) throws Exception {
+ return submitParagraph(noteId, paragraphId, "", new HashMap<>());
+ }
+
+ /**
+ * This used by {@link ZSession} for creating or reusing a paragraph for executing another piece of code.
+ *
+ * @param noteId
+ * @param maxParagraph
+ * @return
+ * @throws Exception
+ */
+ public String nextSessionParagraph(String noteId, int maxParagraph) throws Exception {
+ HttpResponse<JsonNode> response = Unirest
+ .post("/notebook/{noteId}/paragraph/next")
+ .routeParam("noteId", noteId)
+ .queryString("maxParagraph", maxParagraph)
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+
+ return jsonNode.getObject().getString("message");
+ }
+
+ /**
+ * Cancel a running paragraph.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @throws Exception
+ */
+ public void cancelParagraph(String noteId, String paragraphId) throws Exception {
+ HttpResponse<JsonNode> response = Unirest
+ .delete("/notebook/job/{noteId}/{paragraphId}")
+ .routeParam("noteId", noteId)
+ .routeParam("paragraphId", paragraphId)
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+ }
+
+ /**
+ * Query {@link ParagraphResult}
+ *
+ * @param noteId
+ * @param paragraphId
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult queryParagraphResult(String noteId, String paragraphId) throws Exception {
+ HttpResponse<JsonNode> response = Unirest
+ .get("/notebook/{noteId}/paragraph/{paragraphId}")
+ .routeParam("noteId", noteId)
+ .routeParam("paragraphId", paragraphId)
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+
+ JSONObject paragraphJson = jsonNode.getObject().getJSONObject("body");
+ return new ParagraphResult(paragraphJson);
+ }
+
+ /**
+ * Query {@link ParagraphResult} until it is finished.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId) throws Exception {
+ while (true) {
+ ParagraphResult paragraphResult = queryParagraphResult(noteId, paragraphId);
+ LOGGER.debug(paragraphResult.toString());
+ if (paragraphResult.getStatus().isCompleted()) {
+ return paragraphResult;
+ }
+ Thread.sleep(clientConfig.getQueryInterval());
+ }
+ }
+
+ /**
+ * Query {@link ParagraphResult} until it is finished or timeout.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @param timeoutInMills
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId, long timeoutInMills) throws Exception {
+ long start = System.currentTimeMillis();
+ while (true && (System.currentTimeMillis() - start) < timeoutInMills) {
+ ParagraphResult paragraphResult = queryParagraphResult(noteId, paragraphId);
+ if (paragraphResult.getStatus().isCompleted()) {
+ return paragraphResult;
+ }
+ Thread.sleep(clientConfig.getQueryInterval());
+ }
+ throw new Exception("Paragraph is not finished in " + timeoutInMills / 1000 + " seconds");
+ }
+
+ /**
+ * Query {@link ParagraphResult} until it is running.
+ *
+ * @param noteId
+ * @param paragraphId
+ * @return
+ * @throws Exception
+ */
+ public ParagraphResult waitUtilParagraphRunning(String noteId, String paragraphId) throws Exception {
+ while (true) {
+ ParagraphResult paragraphResult = queryParagraphResult(noteId, paragraphId);
+ if (paragraphResult.getStatus().isRunning()) {
+ return paragraphResult;
+ }
+ Thread.sleep(clientConfig.getQueryInterval());
+ }
+ }
+
+ /**
+ * This is equal to the restart operation in note page.
+ *
+ * @param noteId
+ * @param interpreter
+ */
+ public void stopInterpreter(String noteId, String interpreter) throws Exception {
+ JSONObject bodyObject = new JSONObject();
+ bodyObject.put("noteId", noteId);
+ HttpResponse<JsonNode> response = Unirest
+ .put("/interpreter/setting/restart/{interpreter}")
+ .routeParam("interpreter", interpreter)
+ .body(bodyObject.toString())
+ .asJson();
+ checkResponse(response);
+ JsonNode jsonNode = response.getBody();
+ checkJsonNodeStatus(jsonNode);
+ }
+}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/AbstractMessageHandler.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/AbstractMessageHandler.java
new file mode 100644
index 0000000..1aa9eee
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/AbstractMessageHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.websocket;
+
+import com.google.gson.Gson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract class of MessageHandler.
+ */
+public abstract class AbstractMessageHandler implements MessageHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessageHandler.class);
+ private static final Gson GSON = new Gson();
+
+ @Override
+ public void onMessage(String msg) {
+ try {
+ Message messageReceived = GSON.fromJson(msg, Message.class);
+ if (messageReceived.op != Message.OP.PING) {
+ LOGGER.debug("RECEIVE: " + messageReceived.op +
+ ", RECEIVE PRINCIPAL: " + messageReceived.principal +
+ ", RECEIVE TICKET: " + messageReceived.ticket +
+ ", RECEIVE ROLES: " + messageReceived.roles +
+ ", RECEIVE DATA: " + messageReceived.data);
+ }
+
+ switch (messageReceived.op) {
+ case PARAGRAPH_UPDATE_OUTPUT:
+ String noteId = (String) messageReceived.data.get("noteId");
+ String paragraphId = (String) messageReceived.data.get("paragraphId");
+ int index = (int) Double.parseDouble(messageReceived.data.get("index").toString());
+ String type = (String) messageReceived.data.get("type");
+ String output = (String) messageReceived.data.get("data");
+ onStatementUpdateOutput(paragraphId, index, type, output);
+ break;
+ case PARAGRAPH_APPEND_OUTPUT:
+ noteId = (String) messageReceived.data.get("noteId");
+ paragraphId = (String) messageReceived.data.get("paragraphId");
+ index = (int) Double.parseDouble(messageReceived.data.get("index").toString());
+ output = (String) messageReceived.data.get("data");
+ onStatementAppendOutput(paragraphId, index, output);
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Can't handle message: " + msg, e);
+ }
+ }
+
+ /**
+ * Invoked when there's new statement output appended.
+ *
+ * @param statementId
+ * @param index
+ * @param output
+ */
+ public abstract void onStatementAppendOutput(String statementId, int index, String output);
+
+ /**
+ * Invoked when statement's output is updated.
+ *
+ * @param statementId
+ * @param index
+ * @param type
+ * @param output
+ */
+ public abstract void onStatementUpdateOutput(String statementId, int index, String type, String output);
+
+}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/CompositeMessageHandler.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/CompositeMessageHandler.java
new file mode 100644
index 0000000..51169fb
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/CompositeMessageHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.websocket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * CompositeMessageHandler allos you to add StatementHandler for each statement.
+ */
+public class CompositeMessageHandler extends AbstractMessageHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CompositeMessageHandler.class);
+
+ private Map<String, StatementMessageHandler> messageHandlers = new HashMap();
+
+ public void addStatementMessageHandler(String statementId,
+ StatementMessageHandler messageHandler) {
+ messageHandlers.put(statementId, messageHandler);
+ }
+
+ @Override
+ public void onStatementAppendOutput(String statementId, int index, String output) {
+ StatementMessageHandler messageHandler = messageHandlers.get(statementId);
+ if (messageHandler == null) {
+ LOGGER.warn("No messagehandler for statement: " + statementId);
+ return;
+ }
+ messageHandler.onStatementAppendOutput(statementId, index, output);
+ }
+
+ @Override
+ public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
+ StatementMessageHandler messageHandler = messageHandlers.get(statementId);
+ if (messageHandler == null) {
+ LOGGER.warn("No messagehandler for statement: " + statementId);
+ return;
+ }
+ messageHandler.onStatementUpdateOutput(statementId, index, type, output);
+ }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/JsonSerializable.java
similarity index 57%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/JsonSerializable.java
index 8a6d0d0..914243c 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/JsonSerializable.java
@@ -14,32 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.zeppelin.rest.message;
-import com.google.gson.Gson;
-
-import org.apache.zeppelin.common.JsonSerializable;
+package org.apache.zeppelin.client.websocket;
/**
- * RestartInterpreter rest api request message.
+ * Interface for class that can be serialized to json
*/
-public class RestartInterpreterRequest implements JsonSerializable {
- private static final Gson gson = new Gson();
-
- String noteId;
-
- public RestartInterpreterRequest() {
- }
-
- public String getNoteId() {
- return noteId;
- }
-
- public String toJson() {
- return gson.toJson(this);
- }
+public interface JsonSerializable {
- public static RestartInterpreterRequest fromJson(String json) {
- return gson.fromJson(json, RestartInterpreterRequest.class);
- }
+ String toJson();
}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/Message.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/Message.java
new file mode 100644
index 0000000..cca3a46
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/Message.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.websocket;
+
+import com.google.gson.Gson;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Copied from zeppelin-zengine (TODO, zjffdu). Should resume the same piece of code instead of copying.
+ * Zeppelin websocket message template class.
+ */
+public class Message implements JsonSerializable {
+ /**
+ * Representation of event type.
+ */
+ public static enum OP {
+ GET_HOME_NOTE, // [c-s] load note for home screen
+
+ GET_NOTE, // [c-s] client load note
+ // @param id note id
+
+ NOTE, // [s-c] note info
+ // @param note serialized Note object
+
+ PARAGRAPH, // [s-c] paragraph info
+ // @param paragraph serialized paragraph object
+
+ PROGRESS, // [s-c] progress update
+ // @param id paragraph id
+ // @param progress percentage progress
+
+ NEW_NOTE, // [c-s] create new notebook
+ DEL_NOTE, // [c-s] delete notebook
+ // @param id note id
+ REMOVE_FOLDER,
+ MOVE_NOTE_TO_TRASH,
+ MOVE_FOLDER_TO_TRASH,
+ RESTORE_FOLDER,
+ RESTORE_NOTE,
+ RESTORE_ALL,
+ EMPTY_TRASH,
+ CLONE_NOTE, // [c-s] clone new notebook
+ // @param id id of note to clone
+ // @param name name for the cloned note
+ IMPORT_NOTE, // [c-s] import notebook
+ // @param object notebook
+
+ CONVERT_NOTE_NBFORMAT, // [c-s] converting note to nbformat
+ CONVERTED_NOTE_NBFORMAT, // [s-c] converting note to nbformat
+
+ NOTE_UPDATE,
+
+ NOTE_RENAME,
+
+ UPDATE_PERSONALIZED_MODE, // [c-s] update personalized mode (boolean)
+ // @param note id and boolean personalized mode value
+
+ FOLDER_RENAME,
+
+ RUN_PARAGRAPH, // [c-s] run paragraph
+ // @param id paragraph id
+ // @param paragraph paragraph content.ie. script
+ // @param config paragraph config
+ // @param params paragraph params
+
+ COMMIT_PARAGRAPH, // [c-s] commit paragraph
+ // @param id paragraph id
+ // @param title paragraph title
+ // @param paragraph paragraph content.ie. script
+ // @param config paragraph config
+ // @param params paragraph params
+
+ CANCEL_PARAGRAPH, // [c-s] cancel paragraph run
+ // @param id paragraph id
+
+ MOVE_PARAGRAPH, // [c-s] move paragraph order
+ // @param id paragraph id
+ // @param index index the paragraph want to go
+
+ INSERT_PARAGRAPH, // [c-s] create new paragraph below current paragraph
+ // @param target index
+
+ COPY_PARAGRAPH, // [c-s] create new para below current para as a copy of current para
+ // @param target index
+ // @param title paragraph title
+ // @param paragraph paragraph content.ie. script
+ // @param config paragraph config
+ // @param params paragraph params
+
+ EDITOR_SETTING, // [c-s] ask paragraph editor setting
+ // @param paragraph text keyword written in paragraph
+ // ex) spark.spark or spark
+
+ COMPLETION, // [c-s] ask completion candidates
+ // @param id
+ // @param buf current code
+ // @param cursor cursor position in code
+
+ COMPLETION_LIST, // [s-c] send back completion candidates list
+ // @param id
+ // @param completions list of string
+
+ LIST_NOTES, // [c-s] ask list of note
+ RELOAD_NOTES_FROM_REPO, // [c-s] reload notes from repo
+
+ NOTES_INFO, // [s-c] list of note infos
+ // @param notes serialized List<NoteInfo> object
+
+ PARAGRAPH_REMOVE,
+ PARAGRAPH_CLEAR_OUTPUT, // [c-s] clear output of paragraph
+ PARAGRAPH_CLEAR_ALL_OUTPUT, // [c-s] clear output of all paragraphs
+ PARAGRAPH_APPEND_OUTPUT, // [s-c] append output
+ PARAGRAPH_UPDATE_OUTPUT, // [s-c] update (replace) output
+ PING,
+ AUTH_INFO,
+
+ ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object
+ ANGULAR_OBJECT_REMOVE, // [s-c] add angular object del
+
+ ANGULAR_OBJECT_UPDATED, // [c-s] angular object value updated,
+
+ ANGULAR_OBJECT_CLIENT_BIND, // [c-s] angular object updated from AngularJS z object
+
+ ANGULAR_OBJECT_CLIENT_UNBIND, // [c-s] angular object unbind from AngularJS z object
+
+ LIST_CONFIGURATIONS, // [c-s] ask all key/value pairs of configurations
+ CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
+ // @param settings serialized Map<String, String> object
+
+ CHECKPOINT_NOTE, // [c-s] checkpoint note to storage repository
+ // @param noteId
+ // @param checkpointName
+
+ LIST_REVISION_HISTORY, // [c-s] list revision history of the notebook
+ // @param noteId
+ NOTE_REVISION, // [c-s] get certain revision of note
+ // @param noteId
+ // @param revisionId
+ SET_NOTE_REVISION, // [c-s] set current notebook head to this revision
+ // @param noteId
+ // @param revisionId
+ NOTE_REVISION_FOR_COMPARE, // [c-s] get certain revision of note for compare
+ // @param noteId
+ // @param revisionId
+ // @param position
+ APP_APPEND_OUTPUT, // [s-c] append output
+ APP_UPDATE_OUTPUT, // [s-c] update (replace) output
+ APP_LOAD, // [s-c] on app load
+ APP_STATUS_CHANGE, // [s-c] on app status change
+
+ LIST_NOTE_JOBS, // [c-s] get note job management information
+ LIST_UPDATE_NOTE_JOBS, // [c-s] get job management information for until unixtime
+ UNSUBSCRIBE_UPDATE_NOTE_JOBS, // [c-s] unsubscribe job information for job management
+ // @param unixTime
+ GET_INTERPRETER_BINDINGS, // [c-s] get interpreter bindings
+ SAVE_INTERPRETER_BINDINGS, // [c-s] save interpreter bindings
+ INTERPRETER_BINDINGS, // [s-c] interpreter bindings
+
+ GET_INTERPRETER_SETTINGS, // [c-s] get interpreter settings
+ INTERPRETER_SETTINGS, // [s-c] interpreter settings
+ ERROR_INFO, // [s-c] error information to be sent
+ SESSION_LOGOUT, // [s-c] error information to be sent
+ WATCHER, // [s-c] Change websocket to watcher mode.
+ PARAGRAPH_ADDED, // [s-c] paragraph is added
+ PARAGRAPH_REMOVED, // [s-c] paragraph deleted
+ PARAGRAPH_MOVED, // [s-c] paragraph moved
+ NOTE_UPDATED, // [s-c] paragraph updated(name, config)
+ RUN_ALL_PARAGRAPHS, // [c-s] run all paragraphs
+ PARAGRAPH_EXECUTED_BY_SPELL, // [c-s] paragraph was executed by spell
+ RUN_PARAGRAPH_USING_SPELL, // [s-c] run paragraph using spell
+ PARAS_INFO, // [s-c] paragraph runtime infos
+ SAVE_NOTE_FORMS, // save note forms
+ REMOVE_NOTE_FORMS, // remove note forms
+ INTERPRETER_INSTALL_STARTED, // [s-c] start to download an interpreter
+ INTERPRETER_INSTALL_RESULT, // [s-c] Status of an interpreter installation
+ COLLABORATIVE_MODE_STATUS, // [s-c] collaborative mode status
+ PATCH_PARAGRAPH, // [c-s][s-c] patch editor text
+ NOTE_RUNNING_STATUS, // [s-c] sequential run status will be change
+ NOTICE // [s-c] Notice
+ }
+
+ // these messages will be ignored during the sequential run of the note
+ private static final Set<OP> disabledForRunningNoteMessages = Collections
+ .unmodifiableSet(new HashSet<>(Arrays.asList(
+ OP.COMMIT_PARAGRAPH,
+ OP.RUN_PARAGRAPH,
+ OP.RUN_PARAGRAPH_USING_SPELL,
+ OP.RUN_ALL_PARAGRAPHS,
+ OP.PARAGRAPH_CLEAR_OUTPUT,
+ OP.PARAGRAPH_CLEAR_ALL_OUTPUT,
+ OP.INSERT_PARAGRAPH,
+ OP.MOVE_PARAGRAPH,
+ OP.COPY_PARAGRAPH,
+ OP.PARAGRAPH_REMOVE,
+ OP.MOVE_NOTE_TO_TRASH,
+ OP.DEL_NOTE,
+ OP.PATCH_PARAGRAPH)));
+
+ private static final Gson gson = new Gson();
+ public static final Message EMPTY = new Message(null);
+
+ public OP op;
+ public Map<String, Object> data = new HashMap<>();
+ public String ticket = "anonymous";
+ public String principal = "anonymous";
+ public String roles = "";
+
+ // Unique id generated from client side. to identify message.
+ // When message from server is response to the client request
+ // includes the msgId in response message, client can pair request and response message.
+ // When server send message that is not response to the client request, set null;
+ public String msgId = MSG_ID_NOT_DEFINED;
+ public static String MSG_ID_NOT_DEFINED = null;
+
+ public Message(OP op) {
+ this.op = op;
+ }
+
+ public Message withMsgId(String msgId) {
+ this.msgId = msgId;
+ return this;
+ }
+
+ public Message put(String k, Object v) {
+ data.put(k, v);
+ return this;
+ }
+
+ public Object get(String k) {
+ return data.get(k);
+ }
+
+ public static boolean isDisabledForRunningNotes(OP eventType) {
+ return disabledForRunningNoteMessages.contains(eventType);
+ }
+
+ public <T> T getType(String key) {
+ return (T) data.get(key);
+ }
+
+ public <T> T getType(String key, Logger LOG) {
+ try {
+ return getType(key);
+ } catch (ClassCastException e) {
+ LOG.error("Failed to get " + key + " from message (Invalid type). " , e);
+ return null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("Message{");
+ sb.append("data=").append(data);
+ sb.append(", op=").append(op);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ public static Message fromJson(String json) {
+ return gson.fromJson(json, Message.class);
+ }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/MessageHandler.java
similarity index 57%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/MessageHandler.java
index 8a6d0d0..5cb8f20 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/MessageHandler.java
@@ -14,32 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.zeppelin.rest.message;
-import com.google.gson.Gson;
-import org.apache.zeppelin.common.JsonSerializable;
+package org.apache.zeppelin.client.websocket;
/**
- * RestartInterpreter rest api request message.
+ * Interface of how to handle websocket message sent from ZeppelinServer.
*/
-public class RestartInterpreterRequest implements JsonSerializable {
- private static final Gson gson = new Gson();
+public interface MessageHandler {
- String noteId;
+ void onMessage(String msg);
- public RestartInterpreterRequest() {
- }
-
- public String getNoteId() {
- return noteId;
- }
-
- public String toJson() {
- return gson.toJson(this);
- }
-
- public static RestartInterpreterRequest fromJson(String json) {
- return gson.fromJson(json, RestartInterpreterRequest.class);
- }
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/SimpleMessageHandler.java
similarity index 55%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/SimpleMessageHandler.java
index 8a6d0d0..ca7b881 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/SimpleMessageHandler.java
@@ -14,32 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.zeppelin.rest.message;
-import com.google.gson.Gson;
+package org.apache.zeppelin.client.websocket;
-import org.apache.zeppelin.common.JsonSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * RestartInterpreter rest api request message.
+ * Simple implementation of AbstractMessageHandler which only print output.
*/
-public class RestartInterpreterRequest implements JsonSerializable {
- private static final Gson gson = new Gson();
+public class SimpleMessageHandler extends AbstractMessageHandler {
- String noteId;
+ private static final Logger LOGGER = LoggerFactory.getLogger(SimpleMessageHandler.class);
- public RestartInterpreterRequest() {
+ @Override
+ public void onStatementAppendOutput(String statementId, int index, String output) {
+ LOGGER.info("Append output, data: {}", output);
}
- public String getNoteId() {
- return noteId;
- }
-
- public String toJson() {
- return gson.toJson(this);
- }
-
- public static RestartInterpreterRequest fromJson(String json) {
- return gson.fromJson(json, RestartInterpreterRequest.class);
+ @Override
+ public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
+ LOGGER.info("Update output, type: {}, data: {}", type, output);
}
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/StatementMessageHandler.java
similarity index 57%
copy from zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
copy to zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/StatementMessageHandler.java
index 8a6d0d0..2ee9e6b 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/StatementMessageHandler.java
@@ -14,32 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.zeppelin.rest.message;
-import com.google.gson.Gson;
-
-import org.apache.zeppelin.common.JsonSerializable;
+package org.apache.zeppelin.client.websocket;
/**
- * RestartInterpreter rest api request message.
+ * MessageHandler for each statement.
*/
-public class RestartInterpreterRequest implements JsonSerializable {
- private static final Gson gson = new Gson();
-
- String noteId;
-
- public RestartInterpreterRequest() {
- }
-
- public String getNoteId() {
- return noteId;
- }
-
- public String toJson() {
- return gson.toJson(this);
- }
+public interface StatementMessageHandler {
+
+ /**
+ * Invoked when there's new statement output appended.
+ *
+ * @param statementId
+ * @param index
+ * @param output
+ */
+ void onStatementAppendOutput(String statementId, int index, String output);
+
+ /**
+ * Invoked when statement's output is updated.
+ *
+ * @param statementId
+ * @param index
+ * @param type
+ * @param output
+ */
+ void onStatementUpdateOutput(String statementId, int index, String type, String output);
- public static RestartInterpreterRequest fromJson(String json) {
- return gson.fromJson(json, RestartInterpreterRequest.class);
- }
}
diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/ZeppelinWebSocketClient.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/ZeppelinWebSocketClient.java
new file mode 100644
index 0000000..4a72c0a
--- /dev/null
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/websocket/ZeppelinWebSocketClient.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.client.websocket;
+
+import com.google.gson.Gson;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Represent websocket client.
+ *
+ */
+@WebSocket(maxTextMessageSize = 10 * 1000 * 1024 )
+public class ZeppelinWebSocketClient {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinWebSocketClient.class);
+ private static final Gson GSON = new Gson();
+
+ private CountDownLatch connectLatch = new CountDownLatch(1);
+ private CountDownLatch closeLatch = new CountDownLatch(1);
+
+ private Session session;
+ private MessageHandler messageHandler;
+ private WebSocketClient wsClient;
+
+ public ZeppelinWebSocketClient(MessageHandler messageHandler) {
+ this.messageHandler = messageHandler;
+ }
+
+ public void connect(String url) throws Exception {
+ this.wsClient = new WebSocketClient();
+ wsClient.start();
+ URI echoUri = new URI(url);
+ ClientUpgradeRequest request = new ClientUpgradeRequest();
+ request.setHeader("Origin", "*");
+ wsClient.connect(this, echoUri, request);
+ connectLatch.await();
+ LOGGER.info("WebSocket connect established");
+ }
+
+ public void addStatementMessageHandler(String statementId,
+ StatementMessageHandler statementMessageHandler) throws Exception {
+ if (messageHandler instanceof CompositeMessageHandler) {
+ ((CompositeMessageHandler) messageHandler).addStatementMessageHandler(statementId, statementMessageHandler);
+ } else {
+ throw new Exception("StatementMessageHandler is only supported by: "
+ + CompositeMessageHandler.class.getSimpleName());
+ }
+ }
+
+ public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
+ return this.closeLatch.await(duration, unit);
+ }
+
+ @OnWebSocketClose
+ public void onClose(int statusCode, String reason) {
+ LOGGER.info("Connection closed, statusCode: {} - reason: {}", statusCode, reason);
+ this.session = null;
+ this.closeLatch.countDown();
+ }
+
+ @OnWebSocketConnect
+ public void onConnect(Session session) {
+ LOGGER.info("Got connect: {}", session.getRemote());
+ this.session = session;
+ connectLatch.countDown();
+ }
+
+ @OnWebSocketMessage
+ public void onText(Session session, String message) throws IOException {
+ messageHandler.onMessage(message);
+ }
+
+ @OnWebSocketError
+ public void onError(Throwable cause) {
+ LOGGER.info("WebSocket Error: " + cause.getMessage());
+ cause.printStackTrace(System.out);
+ }
+
+ public void send(Message message) throws IOException {
+ session.getRemote().sendString(GSON.toJson(message));
+ }
+
+ public CountDownLatch getConnectLatch() {
+ return connectLatch;
+ }
+
+ public void stop() throws Exception {
+ if (this.wsClient != null) {
+ this.wsClient.stop();
+ }
+ }
+
+}
diff --git a/zeppelin-integration/src/test/resources/log4j.properties b/zeppelin-integration/src/test/resources/log4j.properties
index 8368993..c666d57 100644
--- a/zeppelin-integration/src/test/resources/log4j.properties
+++ b/zeppelin-integration/src/test/resources/log4j.properties
@@ -41,6 +41,3 @@ log4j.logger.DataNucleus.Datastore=ERROR
# Log all JDBC parameters
log4j.logger.org.hibernate.type=ALL
-
-log4j.logger.org.apache.zeppelin.interpreter=DEBUG
-log4j.logger.org.apache.zeppelin.spark=DEBUG
diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml
index 71ab23f..5958858 100644
--- a/zeppelin-interpreter-integration/pom.xml
+++ b/zeppelin-interpreter-integration/pom.xml
@@ -45,6 +45,18 @@
<dependencies>
<dependency>
<groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-client</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-nio</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-zengine</artifactId>
<version>${project.version}</version>
<exclusions>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
new file mode 100644
index 0000000..ac9c5d7
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ExecuteResult;
+import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
+import org.apache.zeppelin.client.Status;
+import org.apache.zeppelin.client.ZSession;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.integration.DownloadUtils;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.AbstractTestRestApi;
+import org.apache.zeppelin.utils.TestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class ZSessionIntegrationTest extends AbstractTestRestApi {
+
+ private static Notebook notebook;
+ private static String sparkHome;
+ private static String flinkHome;
+
+ private ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
+
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HELIUM_REGISTRY.getVarName(),
+ "helium");
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ALLOWED_ORIGINS.getVarName(), "*");
+
+ AbstractTestRestApi.startUp(ZSessionIntegrationTest.class.getSimpleName());
+ notebook = TestUtils.getInstance(Notebook.class);
+ sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
+ flinkHome = DownloadUtils.downloadFlink("1.10.1");
+ }
+
+ @AfterClass
+ public static void destroy() throws Exception {
+ AbstractTestRestApi.shutDown();
+ }
+
+ @Test
+ public void testZSession_Shell() throws Exception {
+ ZSession session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("sh")
+ .build();
+
+ try {
+ session.start();
+ assertNull(session.getWeburl());
+ assertNotNull(session.getNoteId());
+
+ Note note = notebook.getNote(session.getNoteId());
+ assertEquals(2, note.getParagraphCount());
+ assertTrue(note.getParagraph(0).getText(), note.getParagraph(0).getText().startsWith("%sh.conf"));
+
+ ExecuteResult result = session.execute("pwd");
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+
+ result = session.execute("invalid_command");
+ assertEquals(Status.ERROR, result.getStatus());
+ assertEquals(2, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("command not found"));
+ assertEquals("TEXT", result.getResults().get(1).getType());
+ assertTrue(result.getResults().get(1).getData(), result.getResults().get(1).getData().contains("ExitValue"));
+
+ assertEquals(4, note.getParagraphCount());
+ assertEquals("%sh\ninvalid_command", note.getParagraph(3).getText());
+
+ } finally {
+ session.stop();
+ }
+ }
+
+ @Test
+ public void testZSession_Shell_Submit() throws Exception {
+ ZSession session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("sh")
+ .build();
+
+ try {
+ session.start();
+ assertNull(session.getWeburl());
+ assertNotNull(session.getNoteId());
+
+ Note note = notebook.getNote(session.getNoteId());
+ assertEquals(2, note.getParagraphCount());
+ assertTrue(note.getParagraph(0).getText(), note.getParagraph(0).getText().startsWith("%sh.conf"));
+
+ ExecuteResult result = session.submit("sleep 10\npwd");
+ assertFalse("Status is: " + result.getStatus().toString(), result.getStatus().isCompleted());
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+
+ result = session.submit("invalid_command");
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(Status.ERROR, result.getStatus());
+ assertEquals(2, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("command not found"));
+ assertEquals("TEXT", result.getResults().get(1).getType());
+ assertTrue(result.getResults().get(1).getData(), result.getResults().get(1).getData().contains("ExitValue"));
+
+ assertEquals(4, note.getParagraphCount());
+ assertEquals("%sh\ninvalid_command", note.getParagraph(3).getText());
+
+ } finally {
+ session.stop();
+ }
+ }
+
+ @Test
+ public void testZSession_Spark() throws Exception {
+ Map<String, String> intpProperties = new HashMap<>();
+ intpProperties.put("SPARK_HOME", sparkHome);
+ intpProperties.put("spark.master", "local[*]");
+
+ ZSession session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("spark")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ try {
+ session.start();
+ assertNotNull(session.getWeburl());
+ assertNotNull(session.getNoteId());
+
+ // scala
+ ExecuteResult result = session.execute("sc.version");
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("2.4.4"));
+ assertEquals(0, result.getJobUrls().size());
+
+ // pyspark
+ result = session.execute("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.registerTempTable('df')\ndf.show()");
+ assertEquals(Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertEquals(
+ "+---+---+\n" +
+ "| _1| _2|\n" +
+ "+---+---+\n" +
+ "| 1| a|\n" +
+ "| 2| b|\n" +
+ "+---+---+", result.getResults().get(0).getData().trim());
+ assertTrue(result.getJobUrls().size() > 0);
+
+ // sparkr
+ result = session.execute("r", "df <- as.DataFrame(faithful)\nhead(df)");
+ assertEquals(Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("eruptions waiting"));
+ assertEquals(2, result.getJobUrls().size());
+
+ // spark sql
+ result = session.execute("sql", "select * from df");
+ assertEquals(Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TABLE", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("1\ta\n2\tb\n"));
+ assertTrue(result.getJobUrls().size() > 0);
+
+ // spark invalid sql
+ result = session.execute("sql", "select * from unknown_table");
+ assertEquals(Status.ERROR, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("NoSuchTableException"));
+ assertEquals(0, result.getJobUrls().size());
+
+ } finally {
+ session.stop();
+ }
+ }
+
+ @Test
+ public void testZSession_Spark_Submit() throws Exception {
+ Map<String, String> intpProperties = new HashMap<>();
+ intpProperties.put("SPARK_HOME", sparkHome);
+ intpProperties.put("spark.master", "local[*]");
+
+ ZSession session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("spark")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ try {
+ session.start();
+ assertNotNull(session.getWeburl());
+ assertNotNull(session.getNoteId());
+
+ // scala
+ ExecuteResult result = session.submit("sc.version");
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("2.4.4"));
+ assertEquals(0, result.getJobUrls().size());
+
+ // pyspark
+ result = session.submit("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.registerTempTable('df')\ndf.show()");
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertEquals(
+ "+---+---+\n" +
+ "| _1| _2|\n" +
+ "+---+---+\n" +
+ "| 1| a|\n" +
+ "| 2| b|\n" +
+ "+---+---+", result.getResults().get(0).getData().trim());
+ assertTrue(result.getJobUrls().size() > 0);
+
+ // sparkr
+ result = session.submit("r", "df <- as.DataFrame(faithful)\nhead(df)");
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("eruptions waiting"));
+ assertEquals(2, result.getJobUrls().size());
+
+ // spark sql
+ result = session.submit("sql", "select * from df");
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TABLE", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("1\ta\n2\tb\n"));
+ assertTrue(result.getJobUrls().size() > 0);
+
+ // spark invalid sql
+ result = session.submit("sql", "select * from unknown_table");
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(Status.ERROR, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("NoSuchTableException"));
+ assertEquals(0, result.getJobUrls().size());
+
+ // cancel
+ result = session.submit("sc.range(1,100).map(e=>{Thread.sleep(1000);e}).collect()");
+ assertFalse("Status is: " + result.getStatus().toString(), result.getStatus().isCompleted());
+ result = session.waitUntilRunning(result.getStatementId());
+ session.cancel(result.getStatementId());
+ assertEquals(result.toString(), Status.RUNNING, result.getStatus());
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(result.toString(), Status.ABORT, result.getStatus());
+
+ } finally {
+ session.stop();
+ }
+ }
+
+ @Test
+ public void testZSession_Flink() throws Exception {
+ Map<String, String> intpProperties = new HashMap<>();
+ intpProperties.put("FLINK_HOME", flinkHome);
+
+ ZSession session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("flink")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ try {
+ session.start();
+ assertNotNull(session.getWeburl());
+ assertNotNull(session.getNoteId());
+
+ // scala
+ ExecuteResult result = session.execute("val data = benv.fromElements(1, 2, 3)\ndata.collect()");
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("1, 2, 3"));
+
+ // sql
+ result = session.execute(getInitStreamScript(200));
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ Map<String, String> localProperties = new HashMap<>();
+ localProperties.put("type", "update");
+ result = session.execute("ssql", localProperties, "select url, count(1) as pv from log group by url");
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+
+ } finally {
+ session.stop();
+ }
+ }
+
+ @Test
+ public void testZSession_Flink_Submit() throws Exception {
+ Map<String, String> intpProperties = new HashMap<>();
+ intpProperties.put("FLINK_HOME", flinkHome);
+
+ ZSession session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("flink")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ try {
+ session.start(new SimpleMessageHandler());
+ assertNotNull(session.getWeburl());
+ assertNotNull(session.getNoteId());
+
+ // scala
+ ExecuteResult result = session.submit("val data = benv.fromElements(1, 2, 3)\ndata.collect()");
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("1, 2, 3"));
+
+ // sql
+ result = session.submit(getInitStreamScript(200));
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ Map<String, String> localProperties = new HashMap<>();
+ localProperties.put("type", "update");
+ result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url");
+ assertFalse("Status is: " + result.getStatus().toString(), result.getStatus().isCompleted());
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+
+ // cancel
+ result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url");
+ assertFalse("Status is: " + result.getStatus().toString(), result.getStatus().isCompleted());
+ result = session.waitUntilRunning(result.getStatementId());
+ session.cancel(result.getStatementId());
+ assertEquals(result.toString(), Status.RUNNING, result.getStatus());
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(result.toString(), Status.ABORT, result.getStatus());
+ } finally {
+ session.stop();
+ }
+ }
+
+ @Test
+ public void testZSession_Python() throws Exception {
+ Map<String, String> intpProperties = new HashMap<>();
+
+ ZSession session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("python")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ try {
+ session.start(new SimpleMessageHandler());
+ assertNull(session.getWeburl());
+ assertNotNull(session.getNoteId());
+
+ // python
+// ExecuteResult result = session.execute("1+1");
+// assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+// assertEquals(1, result.getResults().size());
+// assertEquals("TEXT", result.getResults().get(0).getType());
+// assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("2"));
+//
+// // python
+// result = session.execute("1/0");
+// assertEquals(result.toString(), Status.ERROR, result.getStatus());
+// assertEquals(1, result.getResults().size());
+// assertEquals("TEXT", result.getResults().get(0).getType());
+// assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("ZeroDivisionError"));
+
+ // for loop
+ ExecuteResult result = session.execute("import time\n" +
+ "for i in range(1,10):\n" +
+ "\tprint(i)\n" +
+ "\ttime.sleep(1)");
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TEXT", result.getResults().get(0).getType());
+ } finally {
+ session.stop();
+ }
+ }
+
+ //@Test
+ public void testZSession_Jdbc() throws Exception {
+
+ Map<String, String> intpProperties = new HashMap<>();
+ intpProperties.put("default.driver", "com.mysql.jdbc.Driver");
+ intpProperties.put("default.url", "jdbc:mysql://localhost:3306/");
+ intpProperties.put("default.user", "root");
+
+ ZSession session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("jdbc")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ try {
+ session.start();
+ assertEquals("", session.getWeburl());
+ assertNotNull(session.getNoteId());
+
+ // show databases
+ ExecuteResult result = session.execute("show databases");
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TABLE", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Database"));
+
+ // select statement
+ result = session.execute("SELECT 1 as c1, 2 as c2");
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TABLE", result.getResults().get(0).getType());
+ assertEquals("c1\tc2\n1\t2\n", result.getResults().get(0).getData());
+
+ } finally {
+ session.stop();
+ }
+ }
+
+ //@Test
+ public void testZSession_Jdbc_Submit() throws Exception {
+
+ Map<String, String> intpProperties = new HashMap<>();
+ intpProperties.put("default.driver", "com.mysql.jdbc.Driver");
+ intpProperties.put("default.url", "jdbc:mysql://localhost:3306/");
+ intpProperties.put("default.user", "root");
+
+ ZSession session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("jdbc")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ try {
+ session.start();
+ assertEquals("", session.getWeburl());
+ assertNotNull(session.getNoteId());
+
+ // show databases
+ ExecuteResult result = session.submit("show databases");
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TABLE", result.getResults().get(0).getType());
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Database"));
+
+ // select statement
+ result = session.submit("SELECT 1 as c1, 2 as c2");
+ result = session.waitUntilFinished(result.getStatementId());
+ assertEquals(result.toString(), Status.FINISHED, result.getStatus());
+ assertEquals(1, result.getResults().size());
+ assertEquals("TABLE", result.getResults().get(0).getType());
+ assertEquals("c1\tc2\n1\t2\n", result.getResults().get(0).getData());
+
+ } finally {
+ session.stop();
+ }
+ }
+
+ public static String getInitStreamScript(int sleep_interval) throws IOException {
+ return IOUtils.toString(ZSessionIntegrationTest.class.getResource("/init_stream.scala"))
+ .replace("{{sleep_interval}}", sleep_interval + "");
+ }
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientIntegrationTest.java
new file mode 100644
index 0000000..820ebff
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientIntegrationTest.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.NoteResult;
+import org.apache.zeppelin.client.ParagraphResult;
+import org.apache.zeppelin.client.Status;
+import org.apache.zeppelin.client.ZeppelinClient;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.AbstractTestRestApi;
+import org.apache.zeppelin.utils.TestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ZeppelinClientIntegrationTest extends AbstractTestRestApi {
+ private static Notebook notebook;
+
+ private static ClientConfig clientConfig;
+ private static ZeppelinClient zeppelinClient;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HELIUM_REGISTRY.getVarName(),
+ "helium");
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ALLOWED_ORIGINS.getVarName(), "*");
+
+ AbstractTestRestApi.startUp(ZeppelinClientIntegrationTest.class.getSimpleName());
+ notebook = TestUtils.getInstance(Notebook.class);
+
+ clientConfig = new ClientConfig("http://localhost:8080");
+ zeppelinClient = new ZeppelinClient(clientConfig);
+ }
+
+ @AfterClass
+ public static void destroy() throws Exception {
+ AbstractTestRestApi.shutDown();
+ }
+
+ @Test
+ public void testZeppelinVersion() throws Exception {
+ String version = zeppelinClient.getVersion();
+ LOG.info("Zeppelin version: " + version);
+ }
+
+ @Test
+ public void testNoteOperation() throws Exception {
+ String noteId = zeppelinClient.createNote("/project_1/note1");
+ assertNotNull(notebook.getNote(noteId));
+
+ // create duplicated note
+ try {
+ zeppelinClient.createNote("/project_1/note1");
+ fail("Should fail to create duplicated note");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("existed"));
+ }
+
+ // query NoteResult
+ NoteResult noteResult = zeppelinClient.queryNoteResult(noteId);
+ assertEquals(noteId, noteResult.getNoteId());
+ assertEquals(false, noteResult.isRunning());
+ // note is created with 0 paragraph.
+ assertEquals(0, noteResult.getParagraphResultList().size());
+
+ // query non-existed note
+ try {
+ zeppelinClient.queryNoteResult("unknown-noteId");
+ fail("Should fail to query non-existed note");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("No such note"));
+ }
+
+ zeppelinClient.deleteNote(noteId);
+
+ // deleting the same note again will fail
+ try {
+ zeppelinClient.deleteNote(noteId);
+ fail("Should fail to delete non-existed note");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("No such note"));
+ }
+ }
+
+ @Test
+ public void testExecuteParagraph() throws Exception {
+ // run paragraph succeed
+ String noteId = zeppelinClient.createNote("/test/note_1");
+ String paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello world'");
+ ParagraphResult paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId);
+ assertEquals(paragraphId, paragraphResult.getParagraphId());
+ assertEquals(Status.FINISHED, paragraphResult.getStatus());
+ assertEquals(1, paragraphResult.getResults().size());
+ assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+ assertEquals("hello world\n", paragraphResult.getResults().get(0).getData());
+
+ // run paragraph succeed with dynamic forms
+ paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello ${name=abc}'");
+ paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId);
+ assertEquals(paragraphId, paragraphResult.getParagraphId());
+ assertEquals(paragraphResult.toString(), Status.FINISHED, paragraphResult.getStatus());
+ assertEquals(1, paragraphResult.getResults().size());
+ assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+ assertEquals("hello abc\n", paragraphResult.getResults().get(0).getData());
+
+ // run paragraph succeed with parameters
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("name", "zeppelin");
+ paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId, parameters);
+ assertEquals(paragraphId, paragraphResult.getParagraphId());
+ assertEquals(Status.FINISHED, paragraphResult.getStatus());
+ assertEquals(1, paragraphResult.getResults().size());
+ assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+ assertEquals("hello zeppelin\n", paragraphResult.getResults().get(0).getData());
+
+ // run paragraph failed
+ paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh invalid_command");
+ paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId);
+ assertEquals(paragraphId, paragraphResult.getParagraphId());
+ assertEquals(Status.ERROR, paragraphResult.getStatus());
+ assertEquals(2, paragraphResult.getResults().size());
+ assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+ assertTrue(paragraphResult.getResults().get(0).getData(), paragraphResult.getResults().get(0).getData().contains("command not found"));
+ assertEquals("TEXT", paragraphResult.getResults().get(1).getType());
+ assertTrue(paragraphResult.getResults().get(1).getData(), paragraphResult.getResults().get(1).getData().contains("ExitValue"));
+
+ // run non-existed interpreter
+ paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%non_existed hello");
+ paragraphResult = zeppelinClient.executeParagraph(noteId, paragraphId);
+ assertEquals(paragraphId, paragraphResult.getParagraphId());
+ assertEquals(Status.ERROR, paragraphResult.getStatus());
+ assertEquals(1, paragraphResult.getResults().size());
+ assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+ assertTrue(paragraphResult.getResults().get(0).getData(), paragraphResult.getResults().get(0).getData().contains("Interpreter non_existed not found"));
+
+ // run non-existed paragraph
+ try {
+ zeppelinClient.executeParagraph(noteId, "invalid_paragraph_id");
+ fail("Should fail to run non-existed paragraph");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("No such paragraph"));
+ }
+ }
+
+ @Test
+ public void testSubmitParagraph() throws Exception {
+ String noteId = zeppelinClient.createNote("/test/note_2");
+ String paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello world'");
+ zeppelinClient.submitParagraph(noteId, paragraphId);
+ ParagraphResult paragraphResult = zeppelinClient.waitUtilParagraphFinish(noteId, paragraphId, 10 * 1000);
+ assertEquals(paragraphId, paragraphResult.getParagraphId());
+ assertEquals(Status.FINISHED, paragraphResult.getStatus());
+ assertEquals(1, paragraphResult.getResults().size());
+ assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+ assertEquals("hello world\n", paragraphResult.getResults().get(0).getData());
+
+ // submit paragraph succeed with dynamic forms
+ paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello ${name=abc}'");
+ zeppelinClient.submitParagraph(noteId, paragraphId);
+ paragraphResult = zeppelinClient.waitUtilParagraphFinish(noteId, paragraphId, 10 * 1000);
+ assertEquals(paragraphId, paragraphResult.getParagraphId());
+ assertEquals(paragraphResult.toString(), Status.FINISHED, paragraphResult.getStatus());
+ assertEquals(1, paragraphResult.getResults().size());
+ assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+ assertEquals("hello abc\n", paragraphResult.getResults().get(0).getData());
+
+ // run paragraph succeed with parameters
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("name", "zeppelin");
+ zeppelinClient.submitParagraph(noteId, paragraphId, parameters);
+ paragraphResult = zeppelinClient.waitUtilParagraphFinish(noteId, paragraphId, 10 * 1000);
+ assertEquals(paragraphId, paragraphResult.getParagraphId());
+ assertEquals(Status.FINISHED, paragraphResult.getStatus());
+ assertEquals(1, paragraphResult.getResults().size());
+ assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+ assertEquals("hello zeppelin\n", paragraphResult.getResults().get(0).getData());
+
+ // run paragraph failed
+ paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%sh invalid_command");
+ zeppelinClient.submitParagraph(noteId, paragraphId);
+ paragraphResult = zeppelinClient.waitUtilParagraphFinish(noteId, paragraphId, 10 * 1000);
+
+ assertEquals(paragraphId, paragraphResult.getParagraphId());
+ assertEquals(Status.ERROR, paragraphResult.getStatus());
+ assertEquals(2, paragraphResult.getResults().size());
+ assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+ assertTrue(paragraphResult.getResults().get(0).getData(), paragraphResult.getResults().get(0).getData().contains("command not found"));
+ assertEquals("TEXT", paragraphResult.getResults().get(1).getType());
+ assertTrue(paragraphResult.getResults().get(1).getData(), paragraphResult.getResults().get(1).getData().contains("ExitValue"));
+
+ // run non-existed interpreter
+ paragraphId = zeppelinClient.addParagraph(noteId, "run sh", "%non_existed hello");
+ zeppelinClient.submitParagraph(noteId, paragraphId);
+ paragraphResult = zeppelinClient.waitUtilParagraphFinish(noteId, paragraphId, 10 * 1000);
+ assertEquals(paragraphId, paragraphResult.getParagraphId());
+ assertEquals(Status.ERROR, paragraphResult.getStatus());
+ assertEquals(1, paragraphResult.getResults().size());
+ assertEquals("TEXT", paragraphResult.getResults().get(0).getType());
+ assertTrue(paragraphResult.getResults().get(0).getData(), paragraphResult.getResults().get(0).getData().contains("Interpreter non_existed not found"));
+
+ // run non-existed paragraph
+ try {
+ zeppelinClient.submitParagraph(noteId, "invalid_paragraph_id");
+ fail("Should fail to run non-existed paragraph");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("No such paragraph"));
+ }
+ }
+
+ @Test
+ public void testExecuteNote() throws Exception {
+ try {
+ zeppelinClient.executeNote("unknown_id");
+ fail("Should fail to submit non-existed note");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("No such note"));
+ }
+ String noteId = zeppelinClient.createNote("/test/note_3");
+ String p0Id = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello world'");
+ NoteResult noteResult = zeppelinClient.executeNote(noteId, new HashMap<>());
+ assertEquals(noteId, noteResult.getNoteId());
+ assertEquals(false, noteResult.isRunning());
+ assertEquals(1, noteResult.getParagraphResultList().size());
+ ParagraphResult p0 = noteResult.getParagraphResultList().get(0);
+ assertEquals(Status.FINISHED, p0.getStatus());
+ assertEquals(1, p0.getResults().size());
+ assertEquals("TEXT", p0.getResults().get(0).getType());
+ assertEquals("hello world\n", p0.getResults().get(0).getData());
+
+ // update paragraph with dynamic forms
+ zeppelinClient.updateParagraph(noteId, p0Id, "run sh", "%sh echo 'hello ${name=abc}'");
+ noteResult = zeppelinClient.executeNote(noteId);
+ assertEquals(noteId, noteResult.getNoteId());
+ assertEquals(false, noteResult.isRunning());
+ assertEquals(1, noteResult.getParagraphResultList().size());
+ p0 = noteResult.getParagraphResultList().get(0);
+ assertEquals(Status.FINISHED, p0.getStatus());
+ assertEquals(1, p0.getResults().size());
+ assertEquals("TEXT", p0.getResults().get(0).getType());
+ assertEquals("hello abc\n", p0.getResults().get(0).getData());
+
+ // execute paragraph with parameters
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("name", "zeppelin");
+ noteResult = zeppelinClient.executeNote(noteId, parameters);
+ assertEquals(noteId, noteResult.getNoteId());
+ assertEquals(false, noteResult.isRunning());
+ assertEquals(1, noteResult.getParagraphResultList().size());
+ p0 = noteResult.getParagraphResultList().get(0);
+ assertEquals(Status.FINISHED, p0.getStatus());
+ assertEquals(1, p0.getResults().size());
+ assertEquals("TEXT", p0.getResults().get(0).getType());
+ assertEquals("hello zeppelin\n", p0.getResults().get(0).getData());
+
+ zeppelinClient.addParagraph(noteId, "run sh", "%sh invalid_command");
+ zeppelinClient.addParagraph(noteId, "run sh", "%sh pwd");
+ noteResult = zeppelinClient.executeNote(noteId, parameters);
+ assertEquals(noteId, noteResult.getNoteId());
+ assertEquals(false, noteResult.isRunning());
+ assertEquals(3, noteResult.getParagraphResultList().size());
+ p0 = noteResult.getParagraphResultList().get(0);
+ assertEquals(Status.FINISHED, p0.getStatus());
+ assertEquals(1, p0.getResults().size());
+ assertEquals("TEXT", p0.getResults().get(0).getType());
+ assertEquals("hello zeppelin\n", p0.getResults().get(0).getData());
+
+ ParagraphResult p1 = noteResult.getParagraphResultList().get(1);
+ assertEquals(Status.ERROR, p1.getStatus());
+ assertEquals("TEXT", p1.getResults().get(0).getType());
+ assertTrue(p1.getResults().get(0).getData(), p1.getResults().get(0).getData().contains("command not found"));
+ assertEquals("TEXT", p1.getResults().get(1).getType());
+ assertTrue(p1.getResults().get(1).getData(), p1.getResults().get(1).getData().contains("ExitValue"));
+
+ // p2 will be skipped because p1 fails.
+ ParagraphResult p2 = noteResult.getParagraphResultList().get(2);
+ assertEquals(Status.READY, p2.getStatus());
+ assertEquals(0, p2.getResults().size());
+ }
+
+ @Test
+ public void testSubmitNote() throws Exception {
+ try {
+ zeppelinClient.submitNote("unknown_id");
+ fail("Should fail to submit non-existed note");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("No such note"));
+ }
+ String noteId = zeppelinClient.createNote("/test/note_4");
+ String p0Id = zeppelinClient.addParagraph(noteId, "run sh", "%sh echo 'hello world'");
+ zeppelinClient.submitNote(noteId);
+ NoteResult noteResult = zeppelinClient.waitUntilNoteFinished(noteId);
+ assertEquals(noteId, noteResult.getNoteId());
+ assertEquals(false, noteResult.isRunning());
+ assertEquals(1, noteResult.getParagraphResultList().size());
+ ParagraphResult p0 = noteResult.getParagraphResultList().get(0);
+ assertEquals(Status.FINISHED, p0.getStatus());
+ assertEquals(1, p0.getResults().size());
+ assertEquals("TEXT", p0.getResults().get(0).getType());
+ assertEquals("hello world\n", p0.getResults().get(0).getData());
+
+ // update paragraph with dynamic forms
+ zeppelinClient.updateParagraph(noteId, p0Id, "run sh", "%sh sleep 5\necho 'hello ${name=abc}'");
+ noteResult = zeppelinClient.submitNote(noteId);
+ assertEquals(true, noteResult.isRunning());
+ noteResult = zeppelinClient.waitUntilNoteFinished(noteId);
+ assertEquals(noteId, noteResult.getNoteId());
+ assertEquals(false, noteResult.isRunning());
+ assertEquals(1, noteResult.getParagraphResultList().size());
+ p0 = noteResult.getParagraphResultList().get(0);
+ assertEquals(Status.FINISHED, p0.getStatus());
+ assertEquals(1, p0.getResults().size());
+ assertEquals("TEXT", p0.getResults().get(0).getType());
+ assertEquals("hello abc\n", p0.getResults().get(0).getData());
+
+ // execute paragraph with parameters
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("name", "zeppelin");
+ noteResult = zeppelinClient.executeNote(noteId, parameters);
+ assertEquals(noteId, noteResult.getNoteId());
+ assertEquals(false, noteResult.isRunning());
+ assertEquals(1, noteResult.getParagraphResultList().size());
+ p0 = noteResult.getParagraphResultList().get(0);
+ assertEquals(Status.FINISHED, p0.getStatus());
+ assertEquals(1, p0.getResults().size());
+ assertEquals("TEXT", p0.getResults().get(0).getType());
+ assertEquals("hello zeppelin\n", p0.getResults().get(0).getData());
+
+ zeppelinClient.addParagraph(noteId, "run sh", "%sh invalid_command");
+ zeppelinClient.addParagraph(noteId, "run sh", "%sh pwd");
+ zeppelinClient.submitNote(noteId, parameters);
+ noteResult = zeppelinClient.waitUntilNoteFinished(noteId);
+ assertEquals(noteId, noteResult.getNoteId());
+ assertEquals(false, noteResult.isRunning());
+ assertEquals(3, noteResult.getParagraphResultList().size());
+ p0 = noteResult.getParagraphResultList().get(0);
+ assertEquals(Status.FINISHED, p0.getStatus());
+ assertEquals(1, p0.getResults().size());
+ assertEquals("TEXT", p0.getResults().get(0).getType());
+ assertEquals("hello zeppelin\n", p0.getResults().get(0).getData());
+
+ ParagraphResult p1 = noteResult.getParagraphResultList().get(1);
+ assertEquals(Status.ERROR, p1.getStatus());
+ assertEquals("TEXT", p1.getResults().get(0).getType());
+ assertTrue(p1.getResults().get(0).getData(), p1.getResults().get(0).getData().contains("command not found"));
+ assertEquals("TEXT", p1.getResults().get(1).getType());
+ assertTrue(p1.getResults().get(1).getData(), p1.getResults().get(1).getData().contains("ExitValue"));
+
+ // p2 will be skipped because p1 fails.
+ ParagraphResult p2 = noteResult.getParagraphResultList().get(2);
+ assertEquals(Status.READY, p2.getStatus());
+ assertEquals(0, p2.getResults().size());
+ }
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientWithAuthIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientWithAuthIntegrationTest.java
new file mode 100644
index 0000000..9b0fc93
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientWithAuthIntegrationTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.NoteResult;
+import org.apache.zeppelin.client.ParagraphResult;
+import org.apache.zeppelin.client.Status;
+import org.apache.zeppelin.client.ZeppelinClient;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.AbstractTestRestApi;
+import org.apache.zeppelin.utils.TestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ZeppelinClientWithAuthIntegrationTest extends AbstractTestRestApi {
+ private static Notebook notebook;
+
+ private static ClientConfig clientConfig;
+ private static ZeppelinClient zeppelinClient;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HELIUM_REGISTRY.getVarName(),
+ "helium");
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ALLOWED_ORIGINS.getVarName(), "*");
+
+ AbstractTestRestApi.startUpWithAuthenticationEnable(ZeppelinClientWithAuthIntegrationTest.class.getSimpleName());
+ notebook = TestUtils.getInstance(Notebook.class);
+
+ clientConfig = new ClientConfig("http://localhost:8080");
+ zeppelinClient = new ZeppelinClient(clientConfig);
+ }
+
+ @AfterClass
+ public static void destroy() throws Exception {
+ AbstractTestRestApi.shutDown();
+ }
+
+ @Test
+ public void testZeppelinVersion() throws Exception {
+ String version = zeppelinClient.getVersion();
+ LOG.info("Zeppelin version: " + version);
+ }
+
+ @Test
+ public void testCreateNoteWithoutLogin() throws Exception {
+ try {
+ zeppelinClient.createNote("/note_1");
+ fail("Should fail due to not login");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("login first"));
+ }
+ }
+
+ @Test
+ public void testCreateNoteAfterLogin() throws Exception {
+ zeppelinClient.login("admin", "password1");
+ zeppelinClient.createNote("/note_2");
+ }
+
+ @Test
+ public void testLoginFailed() throws Exception {
+ // wrong password
+ try {
+ zeppelinClient.login("admin", "invalid_password");
+ fail("Should fail to login");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("Forbidden"));
+ }
+
+ // wrong username
+ try {
+ zeppelinClient.login("invalid_user", "password1");
+ fail("Should fail to login");
+ } catch (Exception e) {
+ assertTrue(e.getMessage(), e.getMessage().contains("Forbidden"));
+ }
+ }
+}
+
diff --git a/zeppelin-interpreter-integration/src/test/resources/log4j.properties b/zeppelin-interpreter-integration/src/test/resources/log4j.properties
index 773d0af..403c078 100644
--- a/zeppelin-interpreter-integration/src/test/resources/log4j.properties
+++ b/zeppelin-interpreter-integration/src/test/resources/log4j.properties
@@ -19,7 +19,7 @@
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
#log4j.appender.stdout.layout.ConversionPattern=
#%5p [%t] (%F:%L) - %m%n
#%-4r [%t] %-5p %c %x - %m%n
@@ -42,3 +42,5 @@ log4j.logger.DataNucleus.Datastore=ERROR
# Log all JDBC parameters
log4j.logger.org.hibernate.type=ALL
log4j.logger.org.apache.hadoop=WARN
+
+log4j.logger.org.apache.zeppelin.client=INFO
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java
index 9474560..4800619 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContext.java
@@ -18,22 +18,24 @@
package org.apache.zeppelin.interpreter;
/**
- * Context info about running interpreter. This is used for deciding which interpreter binding
- * mode to use.
+ * Context info about running interpreter. This is used for deciding bind to
+ * which interpreter progress(InterpreterGroup)
*/
public class ExecutionContext {
private final String user;
private final String noteId;
+ private final String interpreterGroupId;
private final String defaultInterpreterGroup;
private final boolean inIsolatedMode;
// When is the execution triggered, e.g. when the cron job is triggered or when the rest api is triggered.
private final String startTime;
- public ExecutionContext(String user, String noteId, String defaultInterpreterGroup,
+ public ExecutionContext(String user, String noteId, String interpreterGroupId, String defaultInterpreterGroup,
boolean inIsolatedMode, String startTime) {
this.user = user;
this.noteId = noteId;
+ this.interpreterGroupId = interpreterGroupId;
this.defaultInterpreterGroup = defaultInterpreterGroup;
this.inIsolatedMode = inIsolatedMode;
this.startTime = startTime;
@@ -47,6 +49,10 @@ public class ExecutionContext {
return noteId;
}
+ public String getInterpreterGroupId() {
+ return interpreterGroupId;
+ }
+
public String getDefaultInterpreterGroup() {
return defaultInterpreterGroup;
}
@@ -64,6 +70,7 @@ public class ExecutionContext {
return "ExecutionContext{" +
"user='" + user + '\'' +
", noteId='" + noteId + '\'' +
+ ", interpreterGroupId='" + interpreterGroupId + '\'' +
", defaultInterpreterGroup='" + defaultInterpreterGroup + '\'' +
", inIsolatedMode=" + inIsolatedMode +
", startTime=" + startTime +
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContextBuilder.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContextBuilder.java
index bf8be9c..86956d2 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContextBuilder.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ExecutionContextBuilder.java
@@ -23,6 +23,7 @@ package org.apache.zeppelin.interpreter;
public class ExecutionContextBuilder {
private String user;
private String noteId;
+ private String interpreterGroupId;
private String defaultInterpreterGroup = "";
private boolean inIsolatedMode = false;
private String startTime = "";
@@ -52,7 +53,13 @@ public class ExecutionContextBuilder {
return this;
}
+ public ExecutionContextBuilder setInterpreterGroupId(String interpreterGroupId) {
+ this.interpreterGroupId = interpreterGroupId;
+ return this;
+ }
+
public ExecutionContext createExecutionContext() {
- return new ExecutionContext(user, noteId, defaultInterpreterGroup, inIsolatedMode, startTime);
+ return new ExecutionContext(user, noteId, interpreterGroupId, defaultInterpreterGroup, inIsolatedMode, startTime);
}
+
}
\ No newline at end of file
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index 36ec1bd..54769be 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -69,7 +69,7 @@ import java.util.Set;
@Singleton
public class InterpreterRestApi {
- private static final Logger logger = LoggerFactory.getLogger(InterpreterRestApi.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterRestApi.class);
private final AuthenticationService authenticationService;
private final AuthorizationService authorizationService;
@@ -116,7 +116,7 @@ public class InterpreterRestApi {
return new JsonResponse<>(Status.OK, "", setting).build();
}
} catch (NullPointerException e) {
- logger.error("Exception in InterpreterRestApi while creating ", e);
+ LOGGER.error("Exception in InterpreterRestApi while creating ", e);
return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
ExceptionUtils.getStackTrace(e)).build();
}
@@ -141,10 +141,10 @@ public class InterpreterRestApi {
InterpreterSetting interpreterSetting = interpreterSettingManager
.createNewSetting(request.getName(), request.getGroup(), request.getDependencies(),
request.getOption(), request.getProperties());
- logger.info("new setting created with {}", interpreterSetting.getId());
+ LOGGER.info("new setting created with {}", interpreterSetting.getId());
return new JsonResponse<>(Status.OK, "", interpreterSetting).build();
} catch (IOException e) {
- logger.error("Exception in InterpreterRestApi while creating ", e);
+ LOGGER.error("Exception in InterpreterRestApi while creating ", e);
return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e))
.build();
}
@@ -154,7 +154,7 @@ public class InterpreterRestApi {
@Path("setting/{settingId}")
@ZeppelinApi
public Response updateSetting(String message, @PathParam("settingId") String settingId) {
- logger.info("Update interpreterSetting {}", settingId);
+ LOGGER.info("Update interpreterSetting {}", settingId);
try {
UpdateInterpreterSettingRequest request =
@@ -163,11 +163,11 @@ public class InterpreterRestApi {
.setPropertyAndRestart(settingId, request.getOption(), request.getProperties(),
request.getDependencies());
} catch (InterpreterException e) {
- logger.error("Exception in InterpreterRestApi while updateSetting ", e);
+ LOGGER.error("Exception in InterpreterRestApi while updateSetting ", e);
return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e))
.build();
} catch (IOException e) {
- logger.error("Exception in InterpreterRestApi while updateSetting ", e);
+ LOGGER.error("Exception in InterpreterRestApi while updateSetting ", e);
return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
ExceptionUtils.getStackTrace(e)).build();
}
@@ -185,7 +185,7 @@ public class InterpreterRestApi {
@Path("setting/{settingId}")
@ZeppelinApi
public Response removeSetting(@PathParam("settingId") String settingId) throws IOException {
- logger.info("Remove interpreterSetting {}", settingId);
+ LOGGER.info("Remove interpreterSetting {}", settingId);
interpreterSettingManager.remove(settingId);
return new JsonResponse(Status.OK).build();
}
@@ -197,7 +197,7 @@ public class InterpreterRestApi {
@Path("setting/restart/{settingId}")
@ZeppelinApi
public Response restartSetting(String message, @PathParam("settingId") String settingId) {
- logger.info("Restart interpreterSetting {}, msg={}", settingId, message);
+ LOGGER.info("Restart interpreterSetting {}, msg={}", settingId, message);
InterpreterSetting setting = interpreterSettingManager.get(settingId);
try {
@@ -224,7 +224,7 @@ public class InterpreterRestApi {
}
}
} catch (InterpreterException e) {
- logger.error("Exception in InterpreterRestApi while restartSetting ", e);
+ LOGGER.error("Exception in InterpreterRestApi while restartSetting ", e);
return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e))
.build();
}
@@ -268,9 +268,9 @@ public class InterpreterRestApi {
Repository request = Repository.fromJson(message);
interpreterSettingManager.addRepository(request.getId(), request.getUrl(),
request.isSnapshot(), request.getAuthentication(), request.getProxy());
- logger.info("New repository {} added", request.getId());
+ LOGGER.info("New repository {} added", request.getId());
} catch (Exception e) {
- logger.error("Exception in InterpreterRestApi while adding repository ", e);
+ LOGGER.error("Exception in InterpreterRestApi while adding repository ", e);
return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
ExceptionUtils.getStackTrace(e)).build();
}
@@ -286,11 +286,11 @@ public class InterpreterRestApi {
@Path("repository/{repoId}")
@ZeppelinApi
public Response removeRepository(@PathParam("repoId") String repoId) {
- logger.info("Remove repository {}", repoId);
+ LOGGER.info("Remove repository {}", repoId);
try {
interpreterSettingManager.removeRepository(repoId);
} catch (Exception e) {
- logger.error("Exception in InterpreterRestApi while removing repository ", e);
+ LOGGER.error("Exception in InterpreterRestApi while removing repository ", e);
return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
ExceptionUtils.getStackTrace(e)).build();
}
@@ -311,7 +311,7 @@ public class InterpreterRestApi {
@Path("install")
@ZeppelinApi
public Response installInterpreter(@NotNull String message) {
- logger.info("Install interpreter: {}", message);
+ LOGGER.info("Install interpreter: {}", message);
InterpreterInstallationRequest request = InterpreterInstallationRequest.fromJson(message);
try {
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index 52f4acf..3009676 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -37,6 +37,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
@@ -128,7 +129,7 @@ public class NotebookRestApi extends AbstractRestApi {
public Response getNotePermissions(@PathParam("noteId") String noteId) throws IOException {
checkIfUserIsAnon(getBlockNotAuthenticatedUserErrorMsg());
checkIfUserCanRead(noteId,
- "Insufficient privileges you cannot get the list of permissions for this note");
+ "Insufficient privileges you cannot get the list of permissions for this note");
HashMap<String, Set<String>> permissionsMap = new HashMap<>();
permissionsMap.put("owners", authorizationService.getOwners(noteId));
permissionsMap.put("readers", authorizationService.getReaders(noteId));
@@ -241,6 +242,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response putNotePermissions(@PathParam("noteId") String noteId, String req)
throws IOException {
+
String principal = authenticationService.getPrincipal();
Set<String> roles = authenticationService.getAssociatedRoles();
HashSet<String> userAndRoles = new HashSet<>();
@@ -249,11 +251,11 @@ public class NotebookRestApi extends AbstractRestApi {
checkIfUserIsAnon(getBlockNotAuthenticatedUserErrorMsg());
checkIfUserIsOwner(noteId,
- ownerPermissionError(userAndRoles, authorizationService.getOwners(noteId)));
+ ownerPermissionError(userAndRoles, authorizationService.getOwners(noteId)));
HashMap<String, HashSet<String>> permMap =
- GSON.fromJson(req, new TypeToken<HashMap<String, HashSet<String>>>() {
- }.getType());
+ GSON.fromJson(req, new TypeToken<HashMap<String, HashSet<String>>>() {
+ }.getType());
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -317,7 +319,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response getNoteList() throws IOException {
List<NoteInfo> notesInfo = notebookService.listNotesInfo(false, getServiceContext(),
- new RestServiceCallback<List<NoteInfo>>());
+ new RestServiceCallback<List<NoteInfo>>());
return new JsonResponse<>(Status.OK, "", notesInfo).build();
}
@@ -333,7 +335,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response getNote(@PathParam("noteId") String noteId) throws IOException {
Note note =
- notebookService.getNote(noteId, getServiceContext(), new RestServiceCallback());
+ notebookService.getNote(noteId, getServiceContext(), new RestServiceCallback());
return new JsonResponse<>(Status.OK, "", note).build();
}
@@ -365,13 +367,9 @@ public class NotebookRestApi extends AbstractRestApi {
@Path("import")
@ZeppelinApi
public Response importNote(@QueryParam("notePath") String notePath, String noteJson) throws IOException {
- try {
- Note note = notebookService.importNote(notePath, noteJson, getServiceContext(),
- new RestServiceCallback());
- return new JsonResponse<>(Status.OK, "", note.getId()).build();
- } catch (IOException e) {
- return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage()).build();
- }
+ Note note = notebookService.importNote(notePath, noteJson, getServiceContext(),
+ new RestServiceCallback());
+ return new JsonResponse<>(Status.OK, "", note.getId()).build();
}
/**
@@ -385,13 +383,18 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response createNote(String message) throws IOException {
String user = authenticationService.getPrincipal();
- LOGGER.info("Create new note by JSON {}", message);
+ LOGGER.info("Creating new note by JSON {}", message);
NewNoteRequest request = NewNoteRequest.fromJson(message);
+ String defaultInterpreterGroup = request.getDefaultInterpreterGroup();
+ if (StringUtils.isBlank(defaultInterpreterGroup)) {
+ defaultInterpreterGroup = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT);
+ }
Note note = notebookService.createNote(
- request.getName(),
- zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT),
- getServiceContext(),
- new RestServiceCallback<>());
+ request.getName(),
+ defaultInterpreterGroup,
+ false,
+ getServiceContext(),
+ new RestServiceCallback<>());
AuthenticationInfo subject = new AuthenticationInfo(authenticationService.getPrincipal());
if (request.getParagraphs() != null) {
for (NewParagraphRequest paragraphRequest : request.getParagraphs()) {
@@ -399,7 +402,7 @@ public class NotebookRestApi extends AbstractRestApi {
initParagraph(p, paragraphRequest, user);
}
}
- return new JsonResponse<>(Status.OK, "", note.getId()).build();
+ return new JsonResponse(Status.OK, "", note.getId()).build();
}
/**
@@ -415,13 +418,13 @@ public class NotebookRestApi extends AbstractRestApi {
public Response deleteNote(@PathParam("noteId") String noteId) throws IOException {
LOGGER.info("Delete note {} ", noteId);
notebookService.removeNote(noteId,
- getServiceContext(),
- new RestServiceCallback<String>() {
- @Override
- public void onSuccess(String message, ServiceContext context) {
- notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
- }
- });
+ getServiceContext(),
+ new RestServiceCallback<String>() {
+ @Override
+ public void onSuccess(String message, ServiceContext context) {
+ notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+ }
+ });
return new JsonResponse<>(Status.OK, "").build();
}
@@ -440,6 +443,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response cloneNote(@PathParam("noteId") String noteId, String message)
throws IOException, IllegalArgumentException {
+
LOGGER.info("Clone note by JSON {}", message);
checkIfUserCanWrite(noteId, "Insufficient privileges you cannot clone this note");
NewNoteRequest request = NewNoteRequest.fromJson(message);
@@ -449,13 +453,13 @@ public class NotebookRestApi extends AbstractRestApi {
}
AuthenticationInfo subject = new AuthenticationInfo(authenticationService.getPrincipal());
Note newNote = notebookService.cloneNote(noteId, newNoteName, getServiceContext(),
- new RestServiceCallback<Note>(){
- @Override
- public void onSuccess(Note newNote, ServiceContext context) throws IOException {
- notebookServer.broadcastNote(newNote);
- notebookServer.broadcastNoteList(subject, context.getUserAndRoles());
- }
- });
+ new RestServiceCallback<Note>() {
+ @Override
+ public void onSuccess(Note newNote, ServiceContext context) throws IOException {
+ notebookServer.broadcastNote(newNote);
+ notebookServer.broadcastNoteList(subject, context.getUserAndRoles());
+ }
+ });
return new JsonResponse<>(Status.OK, "", newNote.getId()).build();
}
@@ -471,6 +475,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response renameNote(@PathParam("noteId") String noteId,
String message) throws IOException {
+
LOGGER.info("Rename note by JSON {}", message);
RenameNoteRequest request = GSON.fromJson(message, RenameNoteRequest.class);
String newName = request.getName();
@@ -479,13 +484,13 @@ public class NotebookRestApi extends AbstractRestApi {
throw new BadRequestException("name can not be empty");
}
notebookService.renameNote(noteId, request.getName(), false, getServiceContext(),
- new RestServiceCallback<Note>(){
- @Override
- public void onSuccess(Note note, ServiceContext context) throws IOException {
- notebookServer.broadcastNote(note);
- notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
- }
- });
+ new RestServiceCallback<Note>() {
+ @Override
+ public void onSuccess(Note note, ServiceContext context) throws IOException {
+ notebookServer.broadcastNote(note);
+ notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles());
+ }
+ });
return new JsonResponse(Status.OK, "").build();
}
@@ -501,6 +506,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response insertParagraph(@PathParam("noteId") String noteId, String message)
throws IOException {
+
String user = authenticationService.getPrincipal();
LOGGER.info("Insert paragraph {} {}", noteId, message);
@@ -535,6 +541,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response getParagraph(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId) throws IOException {
+
LOGGER.info("Get paragraph {} {}", noteId, paragraphId);
Note note = notebook.getNote(noteId);
@@ -542,7 +549,6 @@ public class NotebookRestApi extends AbstractRestApi {
checkIfUserCanRead(noteId, "Insufficient privileges you cannot get this paragraph");
Paragraph p = note.getParagraph(paragraphId);
checkIfParagraphIsNotNull(p);
-
return new JsonResponse<>(Status.OK, "", p).build();
}
@@ -558,9 +564,9 @@ public class NotebookRestApi extends AbstractRestApi {
public Response updateParagraph(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId,
String message) throws IOException {
+
String user = authenticationService.getPrincipal();
LOGGER.info("{} will update paragraph {} {}", user, noteId, paragraphId);
-
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
checkIfUserCanWrite(noteId, "Insufficient privileges you cannot update this paragraph");
@@ -595,6 +601,7 @@ public class NotebookRestApi extends AbstractRestApi {
public Response updateParagraphConfig(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId,
String message) throws IOException {
+
String user = authenticationService.getPrincipal();
LOGGER.info("{} will update paragraph config {} {}", user, noteId, paragraphId);
@@ -625,17 +632,17 @@ public class NotebookRestApi extends AbstractRestApi {
@PathParam("paragraphId") String paragraphId,
@PathParam("newIndex") String newIndex)
throws IOException {
+
LOGGER.info("Move paragraph {} {} {}", noteId, paragraphId, newIndex);
notebookService.moveParagraph(noteId, paragraphId, Integer.parseInt(newIndex),
- getServiceContext(),
- new RestServiceCallback<Paragraph>() {
- @Override
- public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
- notebookServer.broadcastNote(result.getNote());
- }
- });
+ getServiceContext(),
+ new RestServiceCallback<Paragraph>() {
+ @Override
+ public void onSuccess(Paragraph result, ServiceContext context) throws IOException {
+ notebookServer.broadcastNote(result.getNote());
+ }
+ });
return new JsonResponse(Status.OK, "").build();
-
}
/**
@@ -650,18 +657,30 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response deleteParagraph(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId) throws IOException {
+
LOGGER.info("Delete paragraph {} {}", noteId, paragraphId);
notebookService.removeParagraph(noteId, paragraphId, getServiceContext(),
- new RestServiceCallback<Paragraph>() {
- @Override
- public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
- notebookServer.broadcastNote(p.getNote());
- }
- });
+ new RestServiceCallback<Paragraph>() {
+ @Override
+ public void onSuccess(Paragraph p, ServiceContext context) throws IOException {
+ notebookServer.broadcastNote(p.getNote());
+ }
+ });
return new JsonResponse(Status.OK, "").build();
}
+ @POST
+ @Path("{noteId}/paragraph/next")
+ public Response nextSessionParagraph(@PathParam("noteId") String noteId,
+ @QueryParam("maxParagraph") int maxParagraph) throws IOException {
+
+ Paragraph p = notebookService.getNextSessionParagraph(noteId, maxParagraph,
+ getServiceContext(),
+ new RestServiceCallback<>());
+ return new JsonResponse(Status.OK, p.getId()).build();
+ }
+
/**
* Clear result of all paragraphs REST API.
*
@@ -675,7 +694,7 @@ public class NotebookRestApi extends AbstractRestApi {
throws IOException {
LOGGER.info("Clear all paragraph output of note {}", noteId);
notebookService.clearAllParagraphOutput(noteId, getServiceContext(),
- new RestServiceCallback<>());
+ new RestServiceCallback<>());
return new JsonResponse(Status.OK, "").build();
}
@@ -697,7 +716,8 @@ public class NotebookRestApi extends AbstractRestApi {
@QueryParam("blocking") Boolean blocking,
@QueryParam("isolated") Boolean isolated,
String message)
- throws IOException, IllegalArgumentException {
+ throws Exception, IllegalArgumentException {
+
if (blocking == null) {
blocking = false;
}
@@ -719,13 +739,8 @@ public class NotebookRestApi extends AbstractRestApi {
checkIfUserCanRun(noteId, "Insufficient privileges you cannot run job for this note");
//TODO(zjffdu), can we run a note via rest api when cron is enabled ?
- try {
- note.runAll(subject, blocking, isolated, params);
- return new JsonResponse<>(Status.OK).build();
- } catch (Exception ex) {
- LOGGER.error("Exception from run", ex);
- return new JsonResponse<>(Status.EXPECTATION_FAILED, ex.getMessage()).build();
- }
+ note.runAll(subject, blocking, isolated, params);
+ return new JsonResponse<>(Status.OK).build();
}
/**
@@ -741,6 +756,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response stopNoteJobs(@PathParam("noteId") String noteId)
throws IOException, IllegalArgumentException {
+
LOGGER.info("Stop note jobs {} ", noteId);
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -767,6 +783,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response getNoteJobStatus(@PathParam("noteId") String noteId)
throws IOException, IllegalArgumentException {
+
LOGGER.info("Get note job status.");
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -790,6 +807,7 @@ public class NotebookRestApi extends AbstractRestApi {
public Response getNoteParagraphJobStatus(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId)
throws IOException, IllegalArgumentException {
+
LOGGER.info("Get note paragraph job status.");
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
@@ -814,8 +832,11 @@ public class NotebookRestApi extends AbstractRestApi {
@Path("job/{noteId}/{paragraphId}")
@ZeppelinApi
public Response runParagraph(@PathParam("noteId") String noteId,
- @PathParam("paragraphId") String paragraphId, String message)
+ @PathParam("paragraphId") String paragraphId,
+ @QueryParam("sessionId") String sessionId,
+ String message)
throws IOException, IllegalArgumentException {
+
LOGGER.info("Run paragraph job asynchronously {} {} {}", noteId, paragraphId, message);
Note note = notebook.getNote(noteId);
@@ -826,11 +847,11 @@ public class NotebookRestApi extends AbstractRestApi {
Map<String, Object> params = new HashMap<>();
if (!StringUtils.isEmpty(message)) {
ParametersRequest request =
- ParametersRequest.fromJson(message);
+ ParametersRequest.fromJson(message);
params = request.getParams();
}
notebookService.runParagraph(noteId, paragraphId, paragraph.getTitle(),
- paragraph.getText(), params, new HashMap<>(),
+ paragraph.getText(), params, new HashMap<>(), sessionId,
false, false, getServiceContext(), new RestServiceCallback<>());
return new JsonResponse<>(Status.OK).build();
}
@@ -851,6 +872,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response runParagraphSynchronously(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId,
+ @QueryParam("sessionId") String sessionId,
String message)
throws IOException, IllegalArgumentException {
LOGGER.info("Run paragraph synchronously {} {} {}", noteId, paragraphId, message);
@@ -863,21 +885,17 @@ public class NotebookRestApi extends AbstractRestApi {
Map<String, Object> params = new HashMap<>();
if (!StringUtils.isEmpty(message)) {
ParametersRequest request =
- ParametersRequest.fromJson(message);
+ ParametersRequest.fromJson(message);
params = request.getParams();
}
if (notebookService.runParagraph(noteId, paragraphId, paragraph.getTitle(),
- paragraph.getText(), params,
- new HashMap<>(), false, true, getServiceContext(), new RestServiceCallback<>())) {
+ paragraph.getText(), params,
+ new HashMap<>(), sessionId, false, true, getServiceContext(), new RestServiceCallback<>())) {
note = notebookService.getNote(noteId, getServiceContext(), new RestServiceCallback<>());
Paragraph p = note.getParagraph(paragraphId);
InterpreterResult result = p.getReturn();
- if (result.code() == InterpreterResult.Code.SUCCESS) {
- return new JsonResponse<>(Status.OK, result).build();
- } else {
- return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, result).build();
- }
+ return new JsonResponse<>(Status.OK, result).build();
} else {
return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, "Fail to run paragraph").build();
}
@@ -900,7 +918,7 @@ public class NotebookRestApi extends AbstractRestApi {
throws IOException, IllegalArgumentException {
LOGGER.info("stop paragraph job {} ", noteId);
notebookService.cancelParagraph(noteId, paragraphId, getServiceContext(),
- new RestServiceCallback<Paragraph>());
+ new RestServiceCallback<Paragraph>());
return new JsonResponse<>(Status.OK).build();
}
@@ -917,6 +935,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response registerCronJob(@PathParam("noteId") String noteId, String message)
throws IOException, IllegalArgumentException {
+
LOGGER.info("Register cron job note={} request cron msg={}", noteId, message);
CronRequest request = CronRequest.fromJson(message);
@@ -952,12 +971,13 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response removeCronJob(@PathParam("noteId") String noteId)
throws IOException, IllegalArgumentException {
+
LOGGER.info("Remove cron job note {}", noteId);
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note);
checkIfUserIsOwner(noteId,
- "Insufficient privileges you cannot remove this cron job from this note");
+ "Insufficient privileges you cannot remove this cron job from this note");
checkIfNoteSupportsCron(note);
Map<String, Object> config = note.getConfig();
@@ -982,6 +1002,7 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response getCronJob(@PathParam("noteId") String noteId)
throws IOException, IllegalArgumentException {
+
LOGGER.info("Get cron job note {}", noteId);
Note note = notebook.getNote(noteId);
@@ -1008,7 +1029,7 @@ public class NotebookRestApi extends AbstractRestApi {
public Response getJobListforNote() throws IOException, IllegalArgumentException {
LOGGER.info("Get note jobs for job manager");
List<JobManagerService.NoteJobInfo> noteJobs = jobManagerService
- .getNoteJobInfoByUnixTime(0, getServiceContext(), new RestServiceCallback<>());
+ .getNoteJobInfoByUnixTime(0, getServiceContext(), new RestServiceCallback<>());
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", noteJobs);
@@ -1031,8 +1052,8 @@ public class NotebookRestApi extends AbstractRestApi {
throws IOException, IllegalArgumentException {
LOGGER.info("Get updated note jobs lastUpdateTime {}", lastUpdateUnixTime);
List<JobManagerService.NoteJobInfo> noteJobs =
- jobManagerService.getNoteJobInfoByUnixTime(lastUpdateUnixTime, getServiceContext(),
- new RestServiceCallback<>());
+ jobManagerService.getNoteJobInfoByUnixTime(lastUpdateUnixTime, getServiceContext(),
+ new RestServiceCallback<>());
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", noteJobs);
@@ -1057,9 +1078,9 @@ public class NotebookRestApi extends AbstractRestApi {
String[] ids = notesFound.get(i).get("id").split("/", 2);
String noteId = ids[0];
if (!authorizationService.isOwner(noteId, userAndRoles) &&
- !authorizationService.isReader(noteId, userAndRoles) &&
- !authorizationService.isWriter(noteId, userAndRoles) &&
- !authorizationService.isRunner(noteId, userAndRoles)) {
+ !authorizationService.isReader(noteId, userAndRoles) &&
+ !authorizationService.isWriter(noteId, userAndRoles) &&
+ !authorizationService.isRunner(noteId, userAndRoles)) {
notesFound.remove(i);
i--;
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionManager.java
new file mode 100644
index 0000000..342374f
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionManager.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.rest;
+
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.message.SessionInfo;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ *
+ * Backend manager of ZSessions
+ */
+public class SessionManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
+
+ private static final int RETRY = 3;
+ private Map<String, SessionInfo> sessions = new HashMap<>();
+ private InterpreterSettingManager interpreterSettingManager;
+ private Notebook notebook;
+
+ public SessionManager(Notebook notebook, InterpreterSettingManager interpreterSettingManager) {
+ this.notebook = notebook;
+ this.interpreterSettingManager = interpreterSettingManager;
+ }
+
+ /**
+ * Create a new session, including allocate new session Id and create dedicated note for this session.
+ *
+ * @param interpreter
+ * @return
+ * @throws Exception
+ */
+ public synchronized SessionInfo createSession(String interpreter) throws Exception {
+ String sessionId = null;
+ int i = 0;
+ while (i < RETRY) {
+ sessionId = interpreter + "_" + System.currentTimeMillis();
+ if (sessions.containsKey(sessionId)) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ break;
+ }
+ }
+
+ if (sessionId == null) {
+ throw new Exception("Unable to generate session id");
+ }
+
+ Note sessionNote = notebook.createNote("/_ZSession/" + interpreter + "/" + sessionId, AuthenticationInfo.ANONYMOUS);
+ SessionInfo sessionInfo = new SessionInfo(sessionId, sessionNote.getId(), interpreter);
+ sessions.put(sessionId, sessionInfo);
+ return sessionInfo;
+ }
+
+ /**
+ * Remove and stop this session.
+ *
+ * @param sessionId
+ */
+ public void removeSession(String sessionId) {
+ this.sessions.remove(sessionId);
+ InterpreterGroup interpreterGroup = this.interpreterSettingManager.getInterpreterGroupById(sessionId);
+ if (interpreterGroup == null) {
+ LOGGER.info("No interpreterGroup for session: " + sessionId);
+ return;
+ }
+ ((ManagedInterpreterGroup) interpreterGroup).getInterpreterSetting().closeInterpreters(sessionId);
+ }
+
+ /**
+ * Get the sessionInfo.
+ * It method will also update its state if these's associated interpreter process.
+ *
+ * @param sessionId
+ * @return
+ * @throws Exception
+ */
+ public SessionInfo getSession(String sessionId) throws Exception {
+ SessionInfo sessionInfo = sessions.get(sessionId);
+ if (sessionInfo == null) {
+ LOGGER.warn("No such session: " + sessionId);
+ return null;
+ }
+ InterpreterGroup interpreterGroup = this.interpreterSettingManager.getInterpreterGroupById(sessionId);
+ if (interpreterGroup != null) {
+ RemoteInterpreterProcess remoteInterpreterProcess =
+ ((ManagedInterpreterGroup) interpreterGroup).getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null) {
+ sessionInfo.setState("Ready");
+ } else if (remoteInterpreterProcess != null) {
+ sessionInfo.setStartTime(remoteInterpreterProcess.getStartTime());
+ sessionInfo.setWeburl(interpreterGroup.getWebUrl());
+ if (remoteInterpreterProcess.isRunning()) {
+ sessionInfo.setState("Running");
+ } else {
+ sessionInfo.setState("Stopped");
+ }
+ }
+ }
+
+ return sessionInfo;
+ }
+
+ /**
+ * Get all sessions.
+ *
+ * @return
+ * @throws Exception
+ */
+ public List<SessionInfo> getAllSessions() throws Exception {
+ List<SessionInfo> sessionList = new ArrayList<>();
+ for (String sessionId : sessions.keySet()) {
+ SessionInfo session = getSession(sessionId);
+ if (session != null) {
+ sessionList.add(session);
+ }
+ }
+ return sessionList;
+ }
+
+ /**
+ * Get all sessions for provided interpreter.
+ *
+ * @param interpreter
+ * @return
+ * @throws Exception
+ */
+ public List<SessionInfo> getAllSessions(String interpreter) throws Exception {
+ List<SessionInfo> sessionList = new ArrayList<>();
+ for (String sessionId : sessions.keySet()) {
+ SessionInfo status = getSession(sessionId);
+ if (status != null && interpreter.equals(status.getInterpreter())) {
+ sessionList.add(status);
+ }
+ }
+ return sessionList;
+ }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionRestApi.java
new file mode 100644
index 0000000..8c5b164
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionRestApi.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.zeppelin.annotation.ZeppelinApi;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.message.SessionInfo;
+import org.apache.zeppelin.server.JsonResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Response;
+import java.util.List;
+
+/**
+ * Rest api endpoint for the ZSession.
+ */
+@Path("/session")
+@Produces("application/json")
+@Singleton
+public class SessionRestApi {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionRestApi.class);
+
+ private SessionManager sessionManager;
+
+ @Inject
+ public SessionRestApi(Notebook notebook, InterpreterSettingManager interpreterSettingManager) {
+ this.sessionManager = new SessionManager(notebook, interpreterSettingManager);
+ }
+
+ @GET
+ @Path("/")
+ public Response listSessions(@QueryParam("interpreter") String interpreter) throws Exception {
+ if (StringUtils.isBlank(interpreter)) {
+ LOGGER.info("List all sessions");
+ } else {
+ LOGGER.info("List all sessions for interpreter: " + interpreter);
+ }
+ List<SessionInfo> sessionList = null;
+ if (StringUtils.isBlank(interpreter)) {
+ sessionList = sessionManager.getAllSessions();
+ } else {
+ sessionList = sessionManager.getAllSessions(interpreter);
+ }
+ return new JsonResponse<>(Response.Status.OK, sessionList).build();
+ }
+
+ @POST
+ @Path("/")
+ public Response createSession(@QueryParam("interpreter") String interpreter) throws Exception {
+ LOGGER.info("Create new session for interpreter: {}", interpreter);
+ SessionInfo sessionInfo = sessionManager.createSession(interpreter);
+ return new JsonResponse<>(Response.Status.OK, sessionInfo).build();
+ }
+
+ @DELETE
+ @Path("{sessionId}")
+ public Response stopSession(@PathParam("sessionId") String sessionId) {
+ LOGGER.info("Stop session: {}", sessionId);
+ sessionManager.removeSession(sessionId);
+ return new JsonResponse<>(Response.Status.OK, sessionId).build();
+ }
+
+ /**
+ * Get session info for provided sessionId.
+ */
+ @GET
+ @Path("{sessionId}")
+ @ZeppelinApi
+ public Response getSession(@PathParam("sessionId") String sessionId) throws Exception {
+ SessionInfo session = sessionManager.getSession(sessionId);
+ if (session == null) {
+ return new JsonResponse<>(Response.Status.NOT_FOUND).build();
+ } else {
+ return new JsonResponse<>(Response.Status.OK, "", session).build();
+ }
+ }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
index 79e3fe0..5615f87 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
@@ -23,10 +23,11 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
+
import org.apache.zeppelin.rest.message.gson.ExceptionSerializer;
@Provider
-public class WebApplicationExceptionMapper implements ExceptionMapper<WebApplicationException> {
+public class WebApplicationExceptionMapper implements ExceptionMapper<Throwable> {
private final Gson gson;
public WebApplicationExceptionMapper() {
@@ -37,8 +38,11 @@ public class WebApplicationExceptionMapper implements ExceptionMapper<WebApplica
}
@Override
- public Response toResponse(WebApplicationException exception) {
- return Response.status(exception.getResponse().getStatus())
- .entity(gson.toJson(exception)).build();
+ public Response toResponse(Throwable exception) {
+ if (exception instanceof WebApplicationException) {
+ return ((WebApplicationException) exception).getResponse();
+ } else {
+ return Response.status(500).entity(gson.toJson(exception)).build();
+ }
}
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
index 9cf1df1..7d6a3a6 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewNoteRequest.java
@@ -26,10 +26,12 @@ import org.apache.zeppelin.common.JsonSerializable;
* NewNoteRequest rest api request message.
*/
public class NewNoteRequest implements JsonSerializable {
- private static final Gson gson = new Gson();
+ private static final Gson GSON = new Gson();
- String name;
- List<NewParagraphRequest> paragraphs;
+ //TODO(zjffdu) rename it to be notePath instead of name
+ private String name;
+ private String defaultInterpreterGroup;
+ private List<NewParagraphRequest> paragraphs;
public NewNoteRequest (){
}
@@ -38,15 +40,19 @@ public class NewNoteRequest implements JsonSerializable {
return name;
}
+ public String getDefaultInterpreterGroup() {
+ return defaultInterpreterGroup;
+ }
+
public List<NewParagraphRequest> getParagraphs() {
return paragraphs;
}
public String toJson() {
- return gson.toJson(this);
+ return GSON.toJson(this);
}
public static NewNoteRequest fromJson(String json) {
- return gson.fromJson(json, NewNoteRequest.class);
+ return GSON.fromJson(json, NewNoteRequest.class);
}
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
index 8a6d0d0..f82cb9e 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/RestartInterpreterRequest.java
@@ -24,9 +24,9 @@ import org.apache.zeppelin.common.JsonSerializable;
* RestartInterpreter rest api request message.
*/
public class RestartInterpreterRequest implements JsonSerializable {
- private static final Gson gson = new Gson();
+ private static final Gson GSON = new Gson();
- String noteId;
+ private String noteId;
public RestartInterpreterRequest() {
}
@@ -36,10 +36,10 @@ public class RestartInterpreterRequest implements JsonSerializable {
}
public String toJson() {
- return gson.toJson(this);
+ return GSON.toJson(this);
}
public static RestartInterpreterRequest fromJson(String json) {
- return gson.fromJson(json, RestartInterpreterRequest.class);
+ return GSON.fromJson(json, RestartInterpreterRequest.class);
}
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/SessionInfo.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/SessionInfo.java
new file mode 100644
index 0000000..8e8f2ab
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/SessionInfo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest.message;
+
+/**
+ * (TODO) Move it to zeppelin-common, so that we don't need to have 2 copies.
+ */
+public class SessionInfo {
+
+ private String sessionId;
+ private String noteId;
+ private String interpreter;
+ private String state;
+ private String weburl;
+ private String startTime;
+
+ public SessionInfo(String sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public SessionInfo(String sessionId, String noteId, String interpreter) {
+ this.sessionId = sessionId;
+ this.noteId = noteId;
+ this.interpreter = interpreter;
+ }
+
+ public SessionInfo(String sessionId, String noteId, String interpreter, String state, String weburl, String startTime) {
+ this.sessionId = sessionId;
+ this.noteId = noteId;
+ this.interpreter = interpreter;
+ this.state = state;
+ this.weburl = weburl;
+ this.startTime = startTime;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public String getNoteId() {
+ return noteId;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public String getInterpreter() {
+ return interpreter;
+ }
+
+ public String getWeburl() {
+ return weburl;
+ }
+
+ public String getStartTime() {
+ return startTime;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ public void setStartTime(String startTime) {
+ this.startTime = startTime;
+ }
+
+ public void setWeburl(String weburl) {
+ this.weburl = weburl;
+ }
+}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/gson/ExceptionSerializer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/gson/ExceptionSerializer.java
index ac654c0..78e6101 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/gson/ExceptionSerializer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/gson/ExceptionSerializer.java
@@ -21,6 +21,8 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
import java.lang.reflect.Type;
import javax.ws.rs.WebApplicationException;
@@ -32,6 +34,7 @@ public class ExceptionSerializer implements JsonSerializer<Exception> {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("exception", e.getClass().getSimpleName());
jsonObject.addProperty("message", e.getMessage());
+ jsonObject.addProperty("stacktrace", ExceptionUtils.getStackTrace(e));
if (e instanceof WebApplicationException) {
jsonObject.addProperty("status", ((WebApplicationException) e).getResponse().getStatus());
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index a1b27b5..3763345 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -22,8 +22,6 @@ package org.apache.zeppelin.service;
import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN;
import com.google.common.base.Strings;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -35,7 +33,6 @@ import java.util.Map;
import java.util.Set;
import javax.inject.Inject;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
@@ -52,6 +49,7 @@ import org.apache.zeppelin.notebook.AuthorizationService;
import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl;
import org.apache.zeppelin.notebook.scheduler.SchedulerService;
import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.rest.SessionManager;
import org.apache.zeppelin.rest.exception.BadRequestException;
import org.apache.zeppelin.rest.exception.ForbiddenException;
import org.apache.zeppelin.rest.exception.NoteNotFoundException;
@@ -136,8 +134,19 @@ public class NotebookService {
}
+ /**
+ *
+ * @param notePath
+ * @param defaultInterpreterGroup
+ * @param addingEmptyParagraph
+ * @param context
+ * @param callback
+ * @return
+ * @throws IOException
+ */
public Note createNote(String notePath,
String defaultInterpreterGroup,
+ boolean addingEmptyParagraph,
ServiceContext context,
ServiceCallback<Note> callback) throws IOException {
@@ -150,7 +159,9 @@ public class NotebookService {
Note note = notebook.createNote(normalizeNotePath(notePath), defaultInterpreterGroup,
context.getAutheInfo(), false);
// it's an empty note. so add one paragraph
- note.addNewParagraph(context.getAutheInfo());
+ if (addingEmptyParagraph) {
+ note.addNewParagraph(context.getAutheInfo());
+ }
notebook.saveNote(note, context.getAutheInfo());
callback.onSuccess(note, context);
return note;
@@ -304,6 +315,7 @@ public class NotebookService {
String text,
Map<String, Object> params,
Map<String, Object> config,
+ String sessionId,
boolean failIfDisabled,
boolean blocking,
ServiceContext context,
@@ -353,7 +365,7 @@ public class NotebookService {
try {
notebook.saveNote(note, context.getAutheInfo());
- note.run(p.getId(), blocking, context.getAutheInfo().getUser());
+ note.run(p.getId(), sessionId, blocking, context.getAutheInfo().getUser());
callback.onSuccess(p, context);
return true;
} catch (Exception ex) {
@@ -409,7 +421,7 @@ public class NotebookService {
Map<String, Object> params = (Map<String, Object>) raw.get("params");
Map<String, Object> config = (Map<String, Object>) raw.get("config");
- if (!runParagraph(noteId, paragraphId, title, text, params, config, false, true,
+ if (!runParagraph(noteId, paragraphId, title, text, params, config, null, false, true,
context, callback)) {
// stop execution when one paragraph fails.
return false;
@@ -619,6 +631,37 @@ public class NotebookService {
callback.onSuccess(p, context);
}
+ public Paragraph getNextSessionParagraph(String noteId,
+ int maxParagraph,
+ ServiceContext context,
+ ServiceCallback<Paragraph> callback) throws IOException {
+ if (!checkPermission(noteId, Permission.WRITER, Message.OP.PARAGRAPH_CLEAR_OUTPUT, context,
+ callback)) {
+ throw new IOException("No privilege to access this note");
+ }
+ Note note = notebook.getNote(noteId);
+ if (note == null) {
+ callback.onFailure(new NoteNotFoundException(noteId), context);
+ throw new IOException("No such note");
+ }
+ if (note.getParagraphCount() < maxParagraph) {
+ return note.addNewParagraph(context.getAutheInfo());
+ } else {
+ boolean removed = false;
+ for (int i = 1; i< note.getParagraphCount(); ++i) {
+ if (note.getParagraph(i).getStatus().isCompleted()) {
+ note.removeParagraph(context.getAutheInfo().getUser(), note.getParagraph(i).getId());
+ removed = true;
+ break;
+ }
+ }
+ if (!removed) {
+ throw new IOException("All the paragraphs are not completed, unable to find available paragraph");
+ }
+ return note.addNewParagraph(context.getAutheInfo());
+ }
+ }
+
public void clearParagraphOutput(String noteId,
String paragraphId,
ServiceContext context,
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 2db1d26..0a16c21 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -962,7 +962,7 @@ public class NotebookServer extends WebSocketServlet
String noteName = (String) message.get("name");
String defaultInterpreterGroup = (String) message.get("defaultInterpreterGroup");
- getNotebookService().createNote(noteName, defaultInterpreterGroup, getServiceContext(message),
+ getNotebookService().createNote(noteName, defaultInterpreterGroup, true, getServiceContext(message),
new WebSocketServiceCallback<Note>(conn) {
@Override
public void onSuccess(Note note, ServiceContext context) throws IOException {
@@ -1516,7 +1516,7 @@ public class NotebookServer extends WebSocketServlet
String title = (String) fromMessage.get("title");
Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
- getNotebookService().runParagraph(noteId, paragraphId, title, text, params, config,
+ getNotebookService().runParagraph(noteId, paragraphId, title, text, params, config, null,
false, false, getServiceContext(fromMessage),
new WebSocketServiceCallback<Paragraph>(conn) {
@Override
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookWebSocketCreator.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookWebSocketCreator.java
index 7033929..6a9c8a8 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookWebSocketCreator.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookWebSocketCreator.java
@@ -35,6 +35,7 @@ public class NotebookWebSocketCreator implements WebSocketCreator {
public NotebookWebSocketCreator(NotebookServer notebookServer) {
this.notebookServer = notebookServer;
}
+
public Object createWebSocket(ServletUpgradeRequest request, ServletUpgradeResponse response) {
String origin = request.getHeader("Origin");
if (notebookServer.checkOrigin(request.getHttpServletRequest(), origin)) {
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 9ebd83c..b6c626d 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -186,6 +186,7 @@ public abstract class AbstractTestRestApi {
zeppelinHome = new File("..");
LOG.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
confDir = new File(zeppelinHome, "conf_" + testClassName);
+ FileUtils.deleteDirectory(confDir);
LOG.info("ZEPPELIN_CONF_DIR: " + confDir.getAbsolutePath());
confDir.mkdirs();
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
index 6039bf2..885a78d 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java
@@ -107,7 +107,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
}
@Test
- public void testRunParagraphJob() throws IOException {
+ public void testRunParagraphJob() throws Exception {
LOG.info("Running testRunParagraphJob");
Note note1 = null;
try {
@@ -117,23 +117,25 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
Paragraph p = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
// run blank paragraph
- PostMethod post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId() + "?blocking=true", "");
+ PostMethod post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId(), "");
assertThat(post, isAllowed());
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
new TypeToken<Map<String, Object>>() {}.getType());
assertEquals(resp.get("status"), "OK");
post.releaseConnection();
+ p.waitUntilFinished();
assertEquals(p.getStatus(), Job.Status.FINISHED);
// run non-blank paragraph
p.setText("test");
- post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId() + "?blocking=true", "");
+ post = httpPost("/notebook/job/" + note1.getId() + "/" + p.getId(), "");
assertThat(post, isAllowed());
resp = gson.fromJson(post.getResponseBodyAsString(),
new TypeToken<Map<String, Object>>() {}.getType());
assertEquals(resp.get("status"), "OK");
post.releaseConnection();
- assertNotEquals(p.getStatus(), Job.Status.READY);
+ p.waitUntilFinished();
+ assertNotEquals(p.getStatus(), Job.Status.FINISHED);
} finally {
// cleanup
if (null != note1) {
@@ -176,10 +178,10 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
p.setText(text);
post = httpPost("/notebook/run/" + note1.getId() + "/" + p.getId(), "");
- assertEquals(500, post.getStatusCode());
+ assertEquals(200, post.getStatusCode());
resp = gson.fromJson(post.getResponseBodyAsString(),
new TypeToken<Map<String, Object>>() {}.getType());
- assertEquals("INTERNAL_SERVER_ERROR", resp.get("status"));
+ assertEquals("OK", resp.get("status"));
StringMap stringMap = (StringMap) resp.get("body");
assertEquals("ERROR", stringMap.get("code"));
List<StringMap> interpreterResults = (List<StringMap>) stringMap.get("msg");
@@ -470,12 +472,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi {
p2.setText("%python user2='abc'\nprint(user2)");
PostMethod post = httpPost("/notebook/job/" + note1.getId() + "?blocking=true", "");
- assertThat(post, isExpectationFailed());
- Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
- new TypeToken<Map<String, Object>>() {}.getType());
- assertEquals(resp.get("status"), "EXPECTATION_FAILED");
- assertTrue(resp.get("message").toString().contains("Fail to run note because paragraph"));
- post.releaseConnection();
+ assertThat(post, isAllowed());
assertEquals(Job.Status.ERROR, p1.getStatus());
// p2 will be skipped because p1 is failed.
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java
index b949d05..a3d67c8 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookSecurityRestApiTest.java
@@ -58,7 +58,7 @@ public class NotebookSecurityRestApiTest extends AbstractTestRestApi {
@Test
public void testThatUserCanCreateAndRemoveNote() throws IOException {
- String noteId = createNoteForUser("test", "admin", "password1");
+ String noteId = createNoteForUser("test_1", "admin", "password1");
assertNotNull(noteId);
String id = getNoteIdForUser(noteId, "admin", "password1");
assertThat(id, is(noteId));
@@ -67,7 +67,7 @@ public class NotebookSecurityRestApiTest extends AbstractTestRestApi {
@Test
public void testThatOtherUserCanAccessNoteIfPermissionNotSet() throws IOException {
- String noteId = createNoteForUser("test", "admin", "password1");
+ String noteId = createNoteForUser("test_2", "admin", "password1");
userTryGetNote(noteId, "user1", "password2", isAllowed());
@@ -76,7 +76,7 @@ public class NotebookSecurityRestApiTest extends AbstractTestRestApi {
@Test
public void testThatOtherUserCannotAccessNoteIfPermissionSet() throws IOException {
- String noteId = createNoteForUser("test", "admin", "password1");
+ String noteId = createNoteForUser("test_3", "admin", "password1");
//set permission
String payload = "{ \"owners\": [\"admin\"], \"readers\": [\"user2\"], " +
@@ -94,7 +94,7 @@ public class NotebookSecurityRestApiTest extends AbstractTestRestApi {
@Test
public void testThatWriterCannotRemoveNote() throws IOException {
- String noteId = createNoteForUser("test", "admin", "password1");
+ String noteId = createNoteForUser("test_4", "admin", "password1");
//set permission
String payload = "{ \"owners\": [\"admin\", \"user1\"], \"readers\": [\"user2\"], " +
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
index 6e87196..96ef8ee 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
@@ -173,7 +173,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
expectedNoteName = "Note " + newNoteId;
}
assertEquals("compare note name", expectedNoteName, newNoteName);
- assertEquals("initial paragraph check failed", 4, newNote.getParagraphs().size());
+ assertEquals("initial paragraph check failed", 3, newNote.getParagraphs().size());
for (Paragraph p : newNote.getParagraphs()) {
if (StringUtils.isEmpty(p.getText())) {
continue;
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
index a463617..4c2156a 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
@@ -155,14 +155,14 @@ public class NotebookServiceTest {
verify(callback).onSuccess(homeNote, context);
// create note
- Note note1 = notebookService.createNote("/folder_1/note1", "test", context, callback);
+ Note note1 = notebookService.createNote("/folder_1/note1", "test", true, context, callback);
assertEquals("note1", note1.getName());
assertEquals(1, note1.getParagraphCount());
verify(callback).onSuccess(note1, context);
// create duplicated note
reset(callback);
- Note note2 = notebookService.createNote("/folder_1/note1", "test", context, callback);
+ Note note2 = notebookService.createNote("/folder_1/note1", "test", true, context, callback);
assertNull(note2);
ArgumentCaptor<Exception> exception = ArgumentCaptor.forClass(Exception.class);
verify(callback).onFailure(exception.capture(), any(ServiceContext.class));
@@ -203,7 +203,7 @@ public class NotebookServiceTest {
assertEquals("/folder_4/new_name", notesInfo.get(0).getPath());
// create another note
- note2 = notebookService.createNote("/note2", "test", context, callback);
+ note2 = notebookService.createNote("/note2", "test", true, context, callback);
assertEquals("note2", note2.getName());
verify(callback).onSuccess(note2, context);
@@ -334,18 +334,18 @@ public class NotebookServiceTest {
@Test
public void testParagraphOperations() throws IOException {
// create note
- Note note1 = notebookService.createNote("note1", "python", context, callback);
+ Note note1 = notebookService.createNote("note1", "python", false, context, callback);
assertEquals("note1", note1.getName());
- assertEquals(1, note1.getParagraphCount());
+ assertEquals(0, note1.getParagraphCount());
verify(callback).onSuccess(note1, context);
// add paragraph
reset(callback);
- Paragraph p = notebookService.insertParagraph(note1.getId(), 1, new HashMap<>(), context,
+ Paragraph p = notebookService.insertParagraph(note1.getId(), 0, new HashMap<>(), context,
callback);
assertNotNull(p);
verify(callback).onSuccess(p, context);
- assertEquals(2, note1.getParagraphCount());
+ assertEquals(1, note1.getParagraphCount());
// update paragraph
reset(callback);
@@ -365,7 +365,7 @@ public class NotebookServiceTest {
p.getConfig().put("colWidth", "6.0");
p.getConfig().put("title", true);
boolean runStatus = notebookService.runParagraph(note1.getId(), p.getId(), "my_title", "1+1",
- new HashMap<>(), new HashMap<>(), false, false, context, callback);
+ new HashMap<>(), new HashMap<>(), null, false, false, context, callback);
assertTrue(runStatus);
verify(callback).onSuccess(p, context);
assertEquals(2, p.getConfig().size());
@@ -373,7 +373,7 @@ public class NotebookServiceTest {
// run paragraph synchronously via correct code
reset(callback);
runStatus = notebookService.runParagraph(note1.getId(), p.getId(), "my_title", "1+1",
- new HashMap<>(), new HashMap<>(), false, true, context, callback);
+ new HashMap<>(), new HashMap<>(), null, false, true, context, callback);
assertTrue(runStatus);
verify(callback).onSuccess(p, context);
assertEquals(2, p.getConfig().size());
@@ -387,7 +387,7 @@ public class NotebookServiceTest {
reset(callback);
runStatus = notebookService.runParagraph(note1.getId(), p.getId(), "my_title", "invalid_code",
- new HashMap<>(), new HashMap<>(), false, true, context, callback);
+ new HashMap<>(), new HashMap<>(), null, false, true, context, callback);
assertTrue(runStatus);
// TODO(zjffdu) Enable it after ZEPPELIN-3699
// assertNotNull(p.getResult());
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 1a2e05e..206236b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -419,6 +419,10 @@ public class InterpreterSetting {
}
private String getInterpreterGroupId(ExecutionContext executionContext) {
+ if (!StringUtils.isBlank(executionContext.getInterpreterGroupId())) {
+ return executionContext.getInterpreterGroupId();
+ }
+
if (executionContext.isInIsolatedMode()) {
return name + "-isolated-" + executionContext.getNoteId() + "-" +
executionContext.getStartTime();
@@ -531,6 +535,10 @@ public class InterpreterSetting {
closeInterpreters(new ExecutionContextBuilder().setUser(user).setNoteId(noteId).createExecutionContext());
}
+ public void closeInterpreters(String interpreterGroupId) {
+ closeInterpreters(new ExecutionContextBuilder().setInterpreterGroupId(interpreterGroupId).createExecutionContext());
+ }
+
public void closeInterpreters(ExecutionContext executionContext) {
ManagedInterpreterGroup interpreterGroup = getInterpreterGroup(executionContext);
if (interpreterGroup != null) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
index 5f67889..819f02b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
@@ -340,8 +340,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
}.getType());
String noteId = paraInfos.get("noteId");
String paraId = paraInfos.get("paraId");
- String settingId = RemoteInterpreterUtils.
- getInterpreterSettingId(interpreterGroup.getId());
+ String settingId = ((ManagedInterpreterGroup) interpreterGroup).getInterpreterSetting().getId();
if (noteId != null && paraId != null && settingId != null) {
listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos);
}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 616bfc3..209d5dc 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -27,6 +27,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
/**
* Abstract class for interpreter process
@@ -34,11 +36,14 @@ import java.io.IOException;
public abstract class RemoteInterpreterProcess implements InterpreterClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
private static final Gson GSON = new Gson();
+ private static final SimpleDateFormat START_TIME_FORMATTER =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private int connectTimeout;
protected String intpEventServerHost;
protected int intpEventServerPort;
private PooledRemoteClient<Client> remoteClient;
+ private String startTime;
public RemoteInterpreterProcess(int connectTimeout,
int connectionPoolSize,
@@ -47,6 +52,7 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
this.connectTimeout = connectTimeout;
this.intpEventServerHost = intpEventServerHost;
this.intpEventServerPort = intpEventServerPort;
+ this.startTime = START_TIME_FORMATTER.format(new Date());
this.remoteClient = new PooledRemoteClient<Client>(() -> {
TSocket transport = new TSocket(getHost(), getPort());
try {
@@ -63,6 +69,10 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
return connectTimeout;
}
+ public String getStartTime() {
+ return startTime;
+ }
+
public void shutdown() {
if (remoteClient != null) {
remoteClient.shutdown();
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index a55567b..1a1c08a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -788,8 +788,7 @@ public class Note implements JsonSerializable {
// Must run each paragraph in blocking way.
if (!run(p.getId(), true)) {
LOGGER.warn("Skip running the remain notes because paragraph {} fails", p.getId());
- throw new Exception("Fail to run note because paragraph " + p.getId() + " is failed, result: " +
- p.getReturn());
+ return;
}
} catch (InterpreterNotFoundException e) {
// ignore, because the following run method will fail if interpreter not found.
@@ -834,7 +833,7 @@ public class Note implements JsonSerializable {
* @param blocking Whether run this paragraph in blocking way
*/
public boolean run(String paragraphId, boolean blocking) {
- return run(paragraphId, blocking, null);
+ return run(paragraphId, null, blocking, null);
}
/**
@@ -846,6 +845,7 @@ public class Note implements JsonSerializable {
* @return
*/
public boolean run(String paragraphId,
+ String interpreterGroupId,
boolean blocking,
String ctxUser) {
Paragraph p = getParagraph(paragraphId);
@@ -854,7 +854,7 @@ public class Note implements JsonSerializable {
p = p.getUserParagraph(ctxUser);
p.setListener(this.paragraphJobListener);
- return p.execute(blocking);
+ return p.execute(interpreterGroupId, blocking);
}
/**
@@ -994,12 +994,10 @@ public class Note implements JsonSerializable {
public List<InterpreterSetting> getUsedInterpreterSettings() {
Set<InterpreterSetting> settings = new HashSet<>();
for (Paragraph p : getParagraphs()) {
- try {
- Interpreter intp = p.getBindedInterpreter();
+ Interpreter intp = p.getInterpreter();
+ if (intp != null) {
settings.add((
(ManagedInterpreterGroup) intp.getInterpreterGroup()).getInterpreterSetting());
- } catch (InterpreterNotFoundException e) {
- // ignore this
}
}
return new ArrayList<>(settings);
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 336be44..689ecfd 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -81,6 +81,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
private String text;
private String user;
private Date dateUpdated;
+ private int progress;
// paragraph configs like isOpen, colWidth, etc
private Map<String, Object> config = new HashMap<>();
// form and parameter settings
@@ -93,6 +94,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
private transient String intpText;
private transient String scriptText;
private transient Interpreter interpreter;
+ private transient String interpreterGroupId;
private transient Note note;
private transient AuthenticationInfo subject;
// personalized
@@ -248,15 +250,21 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
.setDefaultInterpreterGroup(note.getDefaultInterpreterGroup())
.setInIsolatedMode(note.isIsolatedMode())
.setStartTime(note.getStartTime())
+ .setInterpreterGroupId(interpreterGroupId)
.createExecutionContext();
return this.note.getInterpreterFactory().getInterpreter(intpText, executionContext);
}
+ @VisibleForTesting
public void setInterpreter(Interpreter interpreter) {
this.interpreter = interpreter;
}
+ public Interpreter getInterpreter() {
+ return interpreter;
+ }
+
public List<InterpreterCompletion> completion(String buffer, int cursor) {
setText(buffer);
try {
@@ -297,7 +305,8 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
public int progress() {
try {
if (this.interpreter != null) {
- return this.interpreter.getProgress(getInterpreterContext());
+ this.progress = this.interpreter.getProgress(getInterpreterContext());
+ return this.progress;
} else {
return 0;
}
@@ -319,13 +328,18 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
return checkEmptyConfig && Strings.isNullOrEmpty(scriptText) && localProperties.isEmpty();
}
+ public boolean execute(boolean blocking) {
+ return execute(null, blocking);
+ }
+
/**
* Return true only when paragraph run successfully with state of FINISHED.
* @param blocking
* @return
*/
- public boolean execute(boolean blocking) {
+ public boolean execute(String interpreterGroupId, boolean blocking) {
try {
+ this.interpreterGroupId = interpreterGroupId;
this.interpreter = getBindedInterpreter();
InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup)
interpreter.getInterpreterGroup()).getInterpreterSetting();
@@ -374,6 +388,15 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen
}
@Override
+ public void setStatus(Status status) {
+ super.setStatus(status);
+ // reset interpreterGroupId when paragraph is completed.
+ if (status.isCompleted()) {
+ this.interpreterGroupId = null;
+ }
+ }
+
+ @Override
protected InterpreterResult jobRun() throws Throwable {
try {
if (localProperties.getOrDefault("isRecover", "false").equals("false")) {