You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by an...@apache.org on 2016/05/28 19:13:47 UTC
[2/2] incubator-zeppelin git commit: ZeppelinHub notebook
storage/connection repository
ZeppelinHub notebook storage/connection repository
### What is this PR for?
This is to add [ZeppelinHub](https://www.zeppelinhub.com) notebook storage/connection layer to the Zeppelin.
### What type of PR is it?
Feature
### Todos
* [x] - NotebookRepo rest api
* [x] - ZeppelinHub websocket client
* [x] - Zeppelin websocket client
* [x] - Tests
* [x] - More QA (authentication consistency, etc.)
* [x] - Address review comments
### What is the Jira issue?
### How should this be tested?
First of all, you may need to create account in [ZeppelinHub](https://www.zeppelinhub.com).
Then you can set connection by following guides in [here](https://github.com/khalidhuseynov/incubator-zeppelin/blob/feat/zeppelinhub-storage/docs/storage/storage.md#notebook-storage-in-zeppelinhub--).
Finally you should be able to access and manipulate your notebooks from inside of your ZeppelinHub account.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes
Author: Khalid Huseynov <kh...@nflabs.com>
Author: Anthony Corbacho <co...@gmail.com>
Closes #880 from khalidhuseynov/feat/zeppelinhub-storage and squashes the following commits:
b480176 [Anthony Corbacho] Change debug log from info to error
db8d1e8 [Anthony Corbacho] Remove spave after comment
1c78d8c [Khalid Huseynov] address @bzz comments
b1fe8a3 [Khalid Huseynov] move jetty.version property to root
acf58d5 [Khalid Huseynov] address @AhyoungRyu comments
8de3d1f [Khalid Huseynov] add zeppelin hearbeat
6c3aff2 [Khalid Huseynov] remove 074098eeb8ace6545c159d26657768079ae4b208 and ZeppelinHubConnection
efcfca3 [Khalid Huseynov] more docs
28f6bf7 [Khalid Huseynov] add authentication to zeppelin
1ca3d65 [Khalid Huseynov] scheduler service as singleton
32497dc [Anthony Corbacho] When user run a notebook, check if the websocket session exist, open it if close
36176ea [Anthony Corbacho] change log info to debug
8ad482d [Anthony Corbacho] Add routine to check ws connection to zeppelinhub is open, if not open it (keep trying)
27a4042 [Anthony Corbacho] rename fct hub -> ZeppelinHub
4c47e43 [Anthony Corbacho] code cleanup & walkthrough
636b9fa [Khalid Huseynov] fix tests
4b14a98 [Anthony Corbacho] add ASF headers
4ac8115 [Khalid Huseynov] add config info and docs
18ee802 [Khalid Huseynov] improve repo tests
2d27ec6 [Khalid Huseynov] add zeppelinhub ws client test
ebcf692 [Khalid Huseynov] add zeppelin client tests
9f1b8bf [Khalid Huseynov] add zeppelin websocket client
1ce01ef [Khalid Huseynov] add rest package
45bec47 [Anthony Corbacho] add zeppelinhub websocket client
f8f168a [Anthony Corbacho] Add zeppelinhubmessage class + tests
a1c5f52 [Anthony Corbacho] prerequisite changes for starting websocket client
45ed9e1 [Khalid Huseynov] initial rest api
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/8cde5c9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/8cde5c9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/8cde5c9b
Branch: refs/heads/master
Commit: 8cde5c9bd47e3e9682d016274a808e096ab34d0b
Parents: 2154476
Author: Khalid Huseynov <kh...@nflabs.com>
Authored: Sat May 28 11:43:07 2016 -0700
Committer: Anthony Corbacho <co...@gmail.com>
Committed: Sat May 28 12:13:11 2016 -0700
----------------------------------------------------------------------
conf/zeppelin-env.cmd.template | 6 +
conf/zeppelin-env.sh.template | 8 +-
conf/zeppelin-site.xml.template | 11 +-
docs/_includes/themes/zeppelin/_navigation.html | 1 +
docs/storage/storage.md | 31 ++
pom.xml | 1 +
zeppelin-server/pom.xml | 1 -
.../org/apache/zeppelin/socket/Message.java | 151 ----------
.../apache/zeppelin/socket/NotebookServer.java | 3 +-
.../zeppelin/socket/NotebookServerTest.java | 3 +-
zeppelin-zengine/pom.xml | 39 +++
.../repo/zeppelinhub/ZeppelinHubRepo.java | 195 +++++++++++++
.../rest/ZeppelinhubRestApiHandler.java | 210 ++++++++++++++
.../zeppelinhub/security/Authentication.java | 228 +++++++++++++++
.../repo/zeppelinhub/websocket/Client.java | 79 +++++
.../zeppelinhub/websocket/ZeppelinClient.java | 286 +++++++++++++++++++
.../websocket/ZeppelinhubClient.java | 251 ++++++++++++++++
.../websocket/listener/ZeppelinWebsocket.java | 74 +++++
.../listener/ZeppelinhubWebsocket.java | 82 ++++++
.../websocket/protocol/ZeppelinHubOp.java | 30 ++
.../websocket/protocol/ZeppelinhubMessage.java | 85 ++++++
.../websocket/scheduler/SchedulerService.java | 62 ++++
.../websocket/scheduler/ZeppelinHeartbeat.java | 44 +++
.../scheduler/ZeppelinHubHeartbeat.java | 45 +++
.../websocket/session/ZeppelinhubSession.java | 63 ++++
.../websocket/utils/ZeppelinhubUtils.java | 98 +++++++
.../zeppelin/notebook/socket/Message.java | 151 ++++++++++
.../repo/zeppelinhub/ZeppelinHubRepoTest.java | 150 ++++++++++
.../websocket/ZeppelinClientTest.java | 123 ++++++++
.../websocket/ZeppelinhubClientTest.java | 72 +++++
.../websocket/mock/MockEchoWebsocketServer.java | 46 +++
.../websocket/mock/MockEventServlet.java | 14 +
.../websocket/mock/MockEventSocket.java | 38 +++
.../protocol/ZeppelinhubMessageTest.java | 43 +++
.../src/test/resources/list_of_notes | 41 +++
zeppelin-zengine/src/test/resources/note | 16 ++
36 files changed, 2625 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/conf/zeppelin-env.cmd.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-env.cmd.template b/conf/zeppelin-env.cmd.template
index 06799b5..59953dc 100644
--- a/conf/zeppelin-env.cmd.template
+++ b/conf/zeppelin-env.cmd.template
@@ -34,6 +34,7 @@ REM set ZEPPELIN_NOTEBOOK_S3_USER REM User in bucket where notebook
REM set ZEPPELIN_IDENT_STRING REM A string representing this instance of zeppelin. $USER by default.
REM set ZEPPELIN_NICENESS REM The scheduling priority for daemons. Defaults to 0.
REM set ZEPPELIN_INTERPRETER_LOCALREPO REM Local repository for interpreter's additional dependency loading
+REM set ZEPPELIN_NOTEBOOK_STORAGE REM Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
REM Spark interpreter configuration
@@ -62,3 +63,8 @@ REM set ZEPPELIN_SPARK_USEHIVECONTEXT REM Use HiveContext instead of SQLContext
REM set ZEPPELIN_SPARK_CONCURRENTSQL REM Execute multiple SQL concurrently if set true. false by default.
REM set ZEPPELIN_SPARK_MAXRESULT REM Max number of SparkSQL result to display. 1000 by default.
+REM ZeppelinHub connection configuration
+REM
+REM set ZEPPELINHUB_API_ADDRESS REM Refers to the address of the ZeppelinHub service in use
+REM set ZEPPELINHUB_API_TOKEN REM Refers to the Zeppelin instance token of the user
+REM set ZEPPELINHUB_USER_KEY REM Optional, when using Zeppelin with authentication.
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/conf/zeppelin-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template
index 6279de7..be6f3dd 100644
--- a/conf/zeppelin-env.sh.template
+++ b/conf/zeppelin-env.sh.template
@@ -35,7 +35,7 @@
# export ZEPPELIN_IDENT_STRING # A string representing this instance of zeppelin. $USER by default.
# export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0.
# export ZEPPELIN_INTERPRETER_LOCALREPO # Local repository for interpreter's additional dependency loading
-
+# export ZEPPELIN_NOTEBOOK_STORAGE # Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
#### Spark interpreter configuration ####
@@ -70,3 +70,9 @@
# export HBASE_HOME= # (require) Under which HBase scripts and configuration should be
# export HBASE_CONF_DIR= # (optional) Alternatively, configuration directory can be set to point to the directory that has hbase-site.xml
+
+#### ZeppelinHub connection configuration ####
+# export ZEPPELINHUB_API_ADDRESS # Refers to the address of the ZeppelinHub service in use
+# export ZEPPELINHUB_API_TOKEN # Refers to the Zeppelin instance token of the user
+# export ZEPPELINHUB_USER_KEY # Optional, when using Zeppelin with authentication.
+
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index f7a5405..bd61153 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -141,7 +141,7 @@
</property>
-->
-<!-- For versioning your local norebook storage using Git repository
+<!-- For versioning your local notebook storage using Git repository
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.GitNotebookRepo</value>
@@ -149,6 +149,15 @@
</property>
-->
+<!-- For connecting your Zeppelin with ZeppelinHub -->
+<!--
+<property>
+ <name>zeppelin.notebook.storage</name>
+ <value>org.apache.zeppelin.notebook.repo.VFSNotebookRepo, org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo</value>
+ <description>two notebook persistence layers (local + ZeppelinHub)</description>
+</property>
+-->
+
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.VFSNotebookRepo</value>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/docs/_includes/themes/zeppelin/_navigation.html
----------------------------------------------------------------------
diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html
index 10d7be0..8a46b74 100644
--- a/docs/_includes/themes/zeppelin/_navigation.html
+++ b/docs/_includes/themes/zeppelin/_navigation.html
@@ -87,6 +87,7 @@
<li><a href="{{BASE_PATH}}/storage/storage.html#Git">Git Storage</a></li>
<li><a href="{{BASE_PATH}}/storage/storage.html#S3">S3 Storage</a></li>
<li><a href="{{BASE_PATH}}/storage/storage.html#Azure">Azure Storage</a></li>
+ <li><a href="{{BASE_PATH}}/storage/storage.html#ZeppelinHub">ZeppelinHub Storage</a></li>
<li role="separator" class="divider"></li>
<!-- li><span><b>REST API</b><span></li -->
<li><a href="{{BASE_PATH}}/rest-api/rest-interpreter.html">Interpreter API</a></li>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/docs/storage/storage.md
----------------------------------------------------------------------
diff --git a/docs/storage/storage.md b/docs/storage/storage.md
index ea534bf..25c62bb 100644
--- a/docs/storage/storage.md
+++ b/docs/storage/storage.md
@@ -201,3 +201,34 @@ Optionally, you can specify Azure folder structure name in the file **zeppelin-s
<description>optional user name for Azure folder structure</description>
</property>
```
+
+</br>
+#### Notebook Storage in ZeppelinHub <a name="ZeppelinHub"></a>
+
+ZeppelinHub storage layer allows out of the box connection of Zeppelin instance with your ZeppelinHub account. First of all, you need to either comment out the following property in **zeppelin-site.xml**:
+
+```
+<!-- For connecting your Zeppelin with ZeppelinHub -->
+<!--
+<property>
+ <name>zeppelin.notebook.storage</name>
+ <value>org.apache.zeppelin.notebook.repo.VFSNotebookRepo, org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo</value>
+ <description>two notebook persistence layers (local + ZeppelinHub)</description>
+</property>
+-->
+```
+
+or set the environment variable in the file **zeppelin-env.sh**:
+
+```
+export ZEPPELIN_NOTEBOOK_STORAGE="org.apache.zeppelin.notebook.repo.VFSNotebookRepo, org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo"
+```
+
+Secondly, you need to set the environment variables in the file **zeppelin-env.sh**:
+
+```
+export ZEPPELINHUB_API_TOKEN = ZeppelinHub token
+export ZEPPELINHUB_API_ADDRESS = address of ZeppelinHub service (e.g. https://www.zeppelinhub.com)
+```
+
+You can get more information on generating `token` and using authentication on the corresponding [help page](http://help.zeppelinhub.com/zeppelin_integration/#add-a-new-zeppelin-instance-and-generate-a-token).
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 77f3667..28508e7 100755
--- a/pom.xml
+++ b/pom.xml
@@ -118,6 +118,7 @@
<libthrift.version>0.9.2</libthrift.version>
<gson.version>2.2</gson.version>
<guava.version>15.0</guava.version>
+ <jetty.version>9.2.15.v20160210</jetty.version>
<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-server/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml
index b1c0cef..19a95e4 100644
--- a/zeppelin-server/pom.xml
+++ b/zeppelin-server/pom.xml
@@ -35,7 +35,6 @@
<properties>
<cxf.version>2.7.7</cxf.version>
- <jetty.version>9.2.15.v20160210</jetty.version>
<commons.httpclient.version>4.3.6</commons.httpclient.version>
</properties>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
deleted file mode 100644
index 913e184..0000000
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.socket;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Zeppelin websocker massage template class.
- */
-public class Message {
- /**
- * Representation of event type.
- */
- public static enum OP {
- GET_HOME_NOTE, // [c-s] load note for home screen
-
- GET_NOTE, // [c-s] client load note
- // @param id note id
-
- NOTE, // [s-c] note info
- // @param note serlialized Note object
-
- PARAGRAPH, // [s-c] paragraph info
- // @param paragraph serialized paragraph object
-
- PROGRESS, // [s-c] progress update
- // @param id paragraph id
- // @param progress percentage progress
-
- NEW_NOTE, // [c-s] create new notebook
- DEL_NOTE, // [c-s] delete notebook
- // @param id note id
- CLONE_NOTE, // [c-s] clone new notebook
- // @param id id of note to clone
- // @param name name fpor the cloned note
- IMPORT_NOTE, // [c-s] import notebook
- // @param object notebook
- NOTE_UPDATE,
-
- RUN_PARAGRAPH, // [c-s] run paragraph
- // @param id paragraph id
- // @param paragraph paragraph content.ie. script
- // @param config paragraph config
- // @param params paragraph params
-
- COMMIT_PARAGRAPH, // [c-s] commit paragraph
- // @param id paragraph id
- // @param title paragraph title
- // @param paragraph paragraph content.ie. script
- // @param config paragraph config
- // @param params paragraph params
-
- CANCEL_PARAGRAPH, // [c-s] cancel paragraph run
- // @param id paragraph id
-
- MOVE_PARAGRAPH, // [c-s] move paragraph order
- // @param id paragraph id
- // @param index index the paragraph want to go
-
- INSERT_PARAGRAPH, // [c-s] create new paragraph below current paragraph
- // @param target index
-
- COMPLETION, // [c-s] ask completion candidates
- // @param id
- // @param buf current code
- // @param cursor cursor position in code
-
- COMPLETION_LIST, // [s-c] send back completion candidates list
- // @param id
- // @param completions list of string
-
- LIST_NOTES, // [c-s] ask list of note
- RELOAD_NOTES_FROM_REPO, // [c-s] reload notes from repo
-
- NOTES_INFO, // [s-c] list of note infos
- // @param notes serialized List<NoteInfo> object
-
- PARAGRAPH_REMOVE,
- PARAGRAPH_CLEAR_OUTPUT,
- PARAGRAPH_APPEND_OUTPUT, // [s-c] append output
- PARAGRAPH_UPDATE_OUTPUT, // [s-c] update (replace) output
- PING,
- AUTH_INFO,
-
- ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object
- ANGULAR_OBJECT_REMOVE, // [s-c] add angular object del
-
- ANGULAR_OBJECT_UPDATED, // [c-s] angular object value updated,
-
- ANGULAR_OBJECT_CLIENT_BIND, // [c-s] angular object updated from AngularJS z object
-
- ANGULAR_OBJECT_CLIENT_UNBIND, // [c-s] angular object unbind from AngularJS z object
-
- LIST_CONFIGURATIONS, // [c-s] ask all key/value pairs of configurations
- CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
- // @param settings serialized Map<String, String> object
-
- CHECKPOINT_NOTEBOOK // [c-s] checkpoint notebook to storage repository
- // @param noteId
- // @param checkpointName
-
- }
-
- public OP op;
- public Map<String, Object> data = new HashMap<String, Object>();
- public String ticket = "anonymous";
- public String principal = "anonymous";
- public String roles = "";
-
- public Message(OP op) {
- this.op = op;
- }
-
- public Message put(String k, Object v) {
- data.put(k, v);
- return this;
- }
-
- public Object get(String k) {
- return data.get(k);
- }
-
- public <T> T getType(String key) {
- return (T) data.get(key);
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder("Message{");
- sb.append("data=").append(data);
- sb.append(", op=").append(op);
- sb.append('}');
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 05a0ae8..7d50809 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -42,10 +42,11 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.notebook.*;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
-import org.apache.zeppelin.socket.Message.OP;
import org.apache.zeppelin.ticket.TicketContainer;
import org.apache.zeppelin.utils.SecurityUtils;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index 4cf57e3..b904b68 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -30,9 +30,10 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.rest.AbstractTestRestApi;
import org.apache.zeppelin.server.ZeppelinServer;
-import org.apache.zeppelin.socket.Message.OP;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index 9be82ec..30e211d 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -128,6 +128,24 @@
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-client</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-client</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-client</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
@@ -221,6 +239,27 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-server</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
new file mode 100644
index 0000000..3d344b4
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.notebook.repo.NotebookRepo;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * ZeppelinHub repo class.
+ */
+public class ZeppelinHubRepo implements NotebookRepo {
+ private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
+ private static final String DEFAULT_SERVER = "https://www.zeppelinhub.com";
+ static final String ZEPPELIN_CONF_PROP_NAME_SERVER = "zeppelinhub.api.address";
+ static final String ZEPPELIN_CONF_PROP_NAME_TOKEN = "zeppelinhub.api.token";
+ public static final String TOKEN_HEADER = "X-Zeppelin-Token";
+ private static final Gson GSON = new Gson();
+ private static final Note EMPTY_NOTE = new Note();
+ private final Client websocketClient;
+
+ private String token;
+ private ZeppelinhubRestApiHandler restApiClient;
+
+ public ZeppelinHubRepo(ZeppelinConfiguration conf) {
+ String zeppelinHubUrl = getZeppelinHubUrl(conf);
+ LOG.info("Initializing ZeppelinHub integration module");
+ token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
+ restApiClient = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
+
+ websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
+ getZeppelinhubWebsocketUri(conf), token, conf);
+ websocketClient.start();
+ }
+
+ private String getZeppelinHubWsUri(URI api) throws URISyntaxException {
+ URI apiRoot = api;
+ String scheme = apiRoot.getScheme();
+ int port = apiRoot.getPort();
+ if (port <= 0) {
+ port = (scheme != null && scheme.equals("https")) ? 443 : 80;
+ }
+
+ if (scheme == null) {
+ LOG.info("{} is not a valid zeppelinhub server address. proceed with default address {}",
+ apiRoot, DEFAULT_SERVER);
+ apiRoot = new URI(DEFAULT_SERVER);
+ scheme = apiRoot.getScheme();
+ port = apiRoot.getPort();
+ if (port <= 0) {
+ port = (scheme != null && scheme.equals("https")) ? 443 : 80;
+ }
+ }
+ String ws = scheme.equals("https") ? "wss://" : "ws://";
+ return ws + apiRoot.getHost() + ":" + port + "/async";
+ }
+
+ String getZeppelinhubWebsocketUri(ZeppelinConfiguration conf) {
+ String zeppelinHubUri = StringUtils.EMPTY;
+ try {
+ zeppelinHubUri = getZeppelinHubWsUri(new URI(conf.getString("ZEPPELINHUB_API_ADDRESS",
+ ZEPPELIN_CONF_PROP_NAME_SERVER, DEFAULT_SERVER)));
+ } catch (URISyntaxException e) {
+ LOG.error("Cannot get ZeppelinHub URI", e);
+ }
+ return zeppelinHubUri;
+ }
+
+ private String getZeppelinWebsocketUri(ZeppelinConfiguration conf) {
+ int port = conf.getServerPort();
+ if (port <= 0) {
+ port = 80;
+ }
+ String ws = conf.useSsl() ? "wss" : "ws";
+ return ws + "://localhost:" + port + "/ws";
+ }
+
+ // Used in tests
+ void setZeppelinhubRestApiHandler(ZeppelinhubRestApiHandler zeppelinhub) {
+ restApiClient = zeppelinhub;
+ }
+
+ String getZeppelinHubUrl(ZeppelinConfiguration conf) {
+ if (conf == null) {
+ LOG.error("Invalid configuration, cannot be null. Using default address {}", DEFAULT_SERVER);
+ return DEFAULT_SERVER;
+ }
+ URI apiRoot;
+ String zeppelinhubUrl;
+ try {
+ String url = conf.getString("ZEPPELINHUB_API_ADDRESS",
+ ZEPPELIN_CONF_PROP_NAME_SERVER,
+ DEFAULT_SERVER);
+ apiRoot = new URI(url);
+ } catch (URISyntaxException e) {
+ LOG.error("Invalid zeppelinhub url, using default address {}", DEFAULT_SERVER, e);
+ return DEFAULT_SERVER;
+ }
+
+ String scheme = apiRoot.getScheme();
+ if (scheme == null) {
+ LOG.info("{} is not a valid zeppelinhub server address. proceed with default address {}",
+ apiRoot, DEFAULT_SERVER);
+ zeppelinhubUrl = DEFAULT_SERVER;
+ } else {
+ zeppelinhubUrl = scheme + "://" + apiRoot.getHost();
+ if (apiRoot.getPort() > 0) {
+ zeppelinhubUrl += ":" + apiRoot.getPort();
+ }
+ }
+ return zeppelinhubUrl;
+ }
+
+ @Override
+ public List<NoteInfo> list() throws IOException {
+ String response = restApiClient.asyncGet("");
+ List<NoteInfo> notes = GSON.fromJson(response, new TypeToken<List<NoteInfo>>() {}.getType());
+ if (notes == null) {
+ return Collections.emptyList();
+ }
+ LOG.info("ZeppelinHub REST API listing notes ");
+ return notes;
+ }
+
+ @Override
+ public Note get(String noteId) throws IOException {
+ if (StringUtils.isBlank(noteId)) {
+ return EMPTY_NOTE;
+ }
+ //String response = zeppelinhubHandler.get(noteId);
+ String response = restApiClient.asyncGet(noteId);
+ Note note = GSON.fromJson(response, Note.class);
+ if (note == null) {
+ return EMPTY_NOTE;
+ }
+ LOG.info("ZeppelinHub REST API get note {} ", noteId);
+ return note;
+ }
+
+ @Override
+ public void save(Note note) throws IOException {
+ if (note == null) {
+ throw new IOException("Zeppelinhub failed to save empty note");
+ }
+ String notebook = GSON.toJson(note);
+ restApiClient.asyncPut(notebook);
+ LOG.info("ZeppelinHub REST API saving note {} ", note.id());
+ }
+
+ @Override
+ public void remove(String noteId) throws IOException {
+ restApiClient.asyncDel(noteId);
+ LOG.info("ZeppelinHub REST API removing note {} ", noteId);
+ }
+
+ @Override
+ public void close() {
+ websocketClient.stop();
+ }
+
+ @Override
+ public void checkpoint(String noteId, String checkPointName) throws IOException {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
new file mode 100644
index 0000000..8f9b2e5
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.client.util.InputStreamResponseListener;
+import org.eclipse.jetty.client.util.StringContentProvider;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * REST API handler.
+ *
+ */
+public class ZeppelinhubRestApiHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
+ public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token";
+ private static final String DEFAULT_API_PATH = "/api/v1/zeppelin";
+ private static boolean PROXY_ON = false;
+ private static String PROXY_HOST;
+ private static int PROXY_PORT;
+
+ private final HttpClient client;
+ private final String zepelinhubUrl;
+ private final String token;
+
+ public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl,
+ String token) {
+ return new ZeppelinhubRestApiHandler(zeppelinhubUrl, token);
+ }
+
+ private ZeppelinhubRestApiHandler(String zeppelinhubUrl, String token) {
+ this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
+ this.token = token;
+
+ //TODO(khalid):to make proxy conf consistent with Zeppelin confs
+ //readProxyConf();
+ client = getAsyncClient();
+
+ try {
+ client.start();
+ } catch (Exception e) {
+ LOG.error("Cannot initialize ZeppelinHub REST async client", e);
+ }
+
+ }
+
+ private void readProxyConf() {
+ //try reading http_proxy
+ String proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ?
+ System.getenv("HTTP_PROXY") : System.getenv("http_proxy");
+ if (StringUtils.isBlank(proxyHostString)) {
+ //try https_proxy if no http_proxy
+ proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ?
+ System.getenv("HTTPS_PROXY") : System.getenv("https_proxy");
+ }
+
+ if (StringUtils.isBlank(proxyHostString)) {
+ PROXY_ON = false;
+ } else {
+ // host format - http://domain:port/
+ String[] parts = proxyHostString.replaceAll("/", "").split(":");
+ if (parts.length != 3) {
+ LOG.warn("Proxy host format is incorrect {}, e.g. http://domain:port/", proxyHostString);
+ PROXY_ON = false;
+ return;
+ }
+ PROXY_HOST = parts[1];
+ PROXY_PORT = Integer.parseInt(parts[2]);
+ LOG.info("Proxy protocol: {}, domain: {}, port: {}", parts[0], parts[1], parts[2]);
+ PROXY_ON = true;
+ }
+ }
+
+ private HttpClient getAsyncClient() {
+ SslContextFactory sslContextFactory = new SslContextFactory();
+ HttpClient httpClient = new HttpClient(sslContextFactory);
+
+ // Configure HttpClient
+ httpClient.setFollowRedirects(false);
+ httpClient.setMaxConnectionsPerDestination(100);
+ // Config considerations
+ //TODO(khalid): consider using proxy
+ //TODO(khalid): consider whether require to follow redirects
+ //TODO(khalid): consider multi-threaded connection manager case
+
+ return httpClient;
+ }
+
+ public String asyncGet(String argument) throws IOException {
+ String note = StringUtils.EMPTY;
+
+ InputStreamResponseListener listener = new InputStreamResponseListener();
+ client.newRequest(zepelinhubUrl + argument)
+ .header(ZEPPELIN_TOKEN_HEADER, token)
+ .send(listener);
+
+ // Wait for the response headers to arrive
+ Response response;
+ try {
+ response = listener.get(30, TimeUnit.SECONDS);
+ } catch (InterruptedException | TimeoutException | ExecutionException e) {
+ LOG.error("Cannot perform Get request to ZeppelinHub", e);
+ throw new IOException("Cannot load note from ZeppelinHub", e);
+ }
+
+ int code = response.getStatus();
+ if (code == 200) {
+ try (InputStream responseContent = listener.getInputStream()) {
+ note = IOUtils.toString(responseContent, "UTF-8");
+ }
+ } else {
+ LOG.error("ZeppelinHub Get {} returned with status {} ", zepelinhubUrl + argument, code);
+ throw new IOException("Cannot load note from ZeppelinHub");
+ }
+ return note;
+ }
+
+ public void asyncPut(String jsonNote) throws IOException {
+ if (StringUtils.isBlank(jsonNote)) {
+ LOG.error("Cannot save empty note/string to ZeppelinHub");
+ return;
+ }
+
+ client.newRequest(zepelinhubUrl).method(HttpMethod.PUT)
+ .header(ZEPPELIN_TOKEN_HEADER, token)
+ .content(new StringContentProvider(jsonNote, "UTF-8"), "application/json;charset=UTF-8")
+ .send(new BufferingResponseListener() {
+
+ @Override
+ public void onComplete(Result res) {
+ if (!res.isFailed() && res.getResponse().getStatus() == 200) {
+ LOG.info("Successfully saved note to ZeppelinHub with {}",
+ res.getResponse().getStatus());
+ } else {
+ LOG.warn("Failed to save note to ZeppelinHub with HttpStatus {}",
+ res.getResponse().getStatus());
+ }
+ }
+
+ @Override
+ public void onFailure(Response response, Throwable failure) {
+ LOG.error("Failed to save note to ZeppelinHub: {}", response.getReason(), failure);
+ }
+ });
+ }
+
+ public void asyncDel(String argument) {
+ if (StringUtils.isBlank(argument)) {
+ LOG.error("Cannot delete empty note from ZeppelinHub");
+ return;
+ }
+ client.newRequest(zepelinhubUrl + argument)
+ .method(HttpMethod.DELETE)
+ .header(ZEPPELIN_TOKEN_HEADER, token)
+ .send(new BufferingResponseListener() {
+
+ @Override
+ public void onComplete(Result res) {
+ if (!res.isFailed() && res.getResponse().getStatus() == 200) {
+ LOG.info("Successfully removed note from ZeppelinHub with {}",
+ res.getResponse().getStatus());
+ } else {
+ LOG.warn("Failed to remove note from ZeppelinHub with HttpStatus {}",
+ res.getResponse().getStatus());
+ }
+ }
+
+ @Override
+ public void onFailure(Response response, Throwable failure) {
+ LOG.error("Failed to remove note from ZeppelinHub: {}", response.getReason(), failure);
+ }
+ });
+ }
+
+ public void close() {
+ try {
+ client.stop();
+ } catch (Exception e) {
+ LOG.info("Couldn't stop ZeppelinHub client properly", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java
new file mode 100644
index 0000000..4b8b42d
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java
@@ -0,0 +1,228 @@
+package org.apache.zeppelin.notebook.repo.zeppelinhub.security;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.GeneralSecurityException;
+import java.security.Key;
+import java.util.Collections;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.NameValuePair;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * Authentication module.
+ *
+ */
+public class Authentication implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(Authentication.class);
+ private String principal = "anonymous";
+ private String ticket = "anonymous";
+ private String roles = StringUtils.EMPTY;
+
+ private final HttpClient client;
+ private String loginEndpoint;
+
+ // Cipher is an AES in CBC mode
+ private static final String CIPHER_ALGORITHM = "AES";
+ private static final String CIPHER_MODE = "AES/CBC/PKCS5PADDING";
+ private static final String KEY = "AbtEr99DxsWWbJkP";
+ private static final int ivSize = 16;
+
+ private static final String ZEPPELIN_CONF_ANONYMOUS_ALLOWED = "zeppelin.anonymous.allowed";
+ private static final String ZEPPELINHUB_USER_KEY = "zeppelinhub.user.key";
+ private String token;
+ private boolean authEnabled;
+ private boolean authenticated;
+ String userKey;
+
+ private Gson gson = new Gson();
+ private static Authentication instance = null;
+
+ public static Authentication initialize(String token, ZeppelinConfiguration conf) {
+ if (instance == null && conf != null) {
+ instance = new Authentication(token, conf);
+ }
+ return instance;
+ }
+
+ public static Authentication getInstance() {
+ return instance;
+ }
+
+ private Authentication(String token, ZeppelinConfiguration conf) {
+ MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
+ client = new HttpClient(connectionManager);
+ this.token = token;
+
+ authEnabled = !conf.getBoolean("ZEPPELIN_ALLOW_ANONYMOUS",
+ ZEPPELIN_CONF_ANONYMOUS_ALLOWED, true);
+
+ userKey = conf.getString("ZEPPELINHUB_USER_KEY",
+ ZEPPELINHUB_USER_KEY, "");
+
+ loginEndpoint = getLoginEndpoint(conf);
+ }
+
+ public String getPrincipal() {
+ return this.principal;
+ }
+
+ public String getTicket() {
+ return this.ticket;
+ }
+
+ public String getRoles() {
+ return this.roles;
+ }
+
+ public boolean isAuthenticated() {
+ return authenticated;
+ }
+ private String getLoginEndpoint(ZeppelinConfiguration conf) {
+ int port = conf.getInt("ZEPPELIN_PORT", "zeppelin.server.port" , 8080);
+ if (port <= 0) {
+ port = 8080;
+ }
+ String scheme = "http";
+ if (conf.useSsl()) {
+ scheme = "https";
+ }
+ String endpoint = scheme + "://localhost:" + port + "/api/login";
+ return endpoint;
+ }
+
+ public boolean authenticate() {
+ if (authEnabled) {
+ if (!StringUtils.isEmpty(userKey)) {
+ String authKey = getAuthKey(userKey);
+ Map<String, String> authCredentials = login(authKey, loginEndpoint);
+ if (isEmptyMap(authCredentials)) {
+ return false;
+ }
+ principal = authCredentials.containsKey("principal") ? authCredentials.get("principal")
+ : principal;
+ ticket = authCredentials.containsKey("ticket") ? authCredentials.get("ticket") : ticket;
+ roles = authCredentials.containsKey("roles") ? authCredentials.get("roles") : roles;
+ LOG.info("Authenticated into Zeppelin as {} and roles {}", principal, roles);
+ return true;
+ } else {
+ LOG.warn("ZEPPELINHUB_USER_KEY isn't provided. Please provide your credentials"
+ + "for your instance in ZeppelinHub website and generate your key.");
+ }
+ }
+ return false;
+ }
+
+ // returns login:password
+ private String getAuthKey(String userKey) {
+ LOG.debug("Encrypted user key is {}", userKey);
+ if (StringUtils.isBlank(userKey)) {
+ LOG.warn("ZEPPELINHUB_USER_KEY is blank");
+ return StringUtils.EMPTY;
+ }
+ //use hashed token as a salt
+ String hashedToken = Integer.toString(token.hashCode());
+ return decrypt(userKey, hashedToken);
+ }
+
+ private String decrypt(String value, String initVector) {
+ LOG.debug("IV is {}, IV length is {}", initVector, initVector.length());
+ if (StringUtils.isBlank(value) || StringUtils.isBlank(initVector)) {
+ LOG.error("String to decode or salt is not provided");
+ return StringUtils.EMPTY;
+ }
+ try {
+ IvParameterSpec iv = generateIV(initVector);
+ Key key = generateKey();
+
+ Cipher cipher = Cipher.getInstance(CIPHER_MODE);
+ cipher.init(Cipher.DECRYPT_MODE, key, iv);
+
+ byte[] decryptedString = Base64.decodeBase64(toBytes(value));
+ decryptedString = cipher.doFinal(decryptedString);
+ return new String(decryptedString);
+ } catch (GeneralSecurityException e) {
+ LOG.error("Error when decrypting", e);
+ return StringUtils.EMPTY;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, String> login(String authKey, String endpoint) {
+ String[] credentials = authKey.split(":");
+ if (credentials.length != 2) {
+ return Collections.emptyMap();
+ }
+ PostMethod post = new PostMethod(endpoint);
+ post.addRequestHeader("Origin", "http://localhost");
+ post.addParameter(new NameValuePair("userName", credentials[0]));
+ post.addParameter(new NameValuePair("password", credentials[1]));
+ try {
+ int code = client.executeMethod(post);
+ if (code == HttpStatus.SC_OK) {
+ String content = post.getResponseBodyAsString();
+ Map<String, Object> resp = gson.fromJson(content,
+ new TypeToken<Map<String, Object>>() {}.getType());
+ LOG.info("Received from Zeppelin LoginRestApi : " + content);
+ return (Map<String, String>) resp.get("body");
+ } else {
+ LOG.error("Failed Zeppelin login {}, status code {}", endpoint, code);
+ return Collections.emptyMap();
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot login into Zeppelin", e);
+ return Collections.emptyMap();
+ }
+ }
+
+ private Key generateKey() {
+ return new SecretKeySpec(toBytes(KEY), CIPHER_ALGORITHM);
+ }
+
+ private byte[] toBytes(String value) {
+ byte[] bytes;
+ try {
+ bytes = value.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ LOG.warn("UTF-8 isn't supported ", e);
+ bytes = value.getBytes();
+ }
+ return bytes;
+ }
+
+ private IvParameterSpec generateIV(String ivString) {
+ byte[] ivFromBytes = toBytes(ivString);
+ byte[] iv16ToBytes = new byte[ivSize];
+ System.arraycopy(ivFromBytes, 0, iv16ToBytes, 0, Math.min(ivFromBytes.length, ivSize));
+ return new IvParameterSpec(iv16ToBytes);
+ }
+
+ private boolean isEmptyMap(Map<String, String> map) {
+ return map == null || map.isEmpty();
+ }
+
+ @Override
+ public void run() {
+ authenticated = authenticate();
+ LOG.info("Scheduled authentication status is {}", authenticated);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java
new file mode 100644
index 0000000..fe2eabf
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client to connect Zeppelin and ZeppelinHub via websocket API.
+ * Implemented using singleton pattern.
+ *
+ */
+public class Client {
+ private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+ private final ZeppelinhubClient zeppelinhubClient;
+ private final ZeppelinClient zeppelinClient;
+ private static Client instance = null;
+
+ public static Client initialize(String zeppelinUri, String zeppelinhubUri, String token,
+ ZeppelinConfiguration conf) {
+ if (instance == null) {
+ instance = new Client(zeppelinUri, zeppelinhubUri, token, conf);
+ }
+ return instance;
+ }
+
+ public static Client getInstance() {
+ return instance;
+ }
+
+ private Client(String zeppelinUri, String zeppelinhubUri, String token,
+ ZeppelinConfiguration conf) {
+ LOG.debug("Init Client");
+ zeppelinhubClient = ZeppelinhubClient.initialize(zeppelinhubUri, token);
+ zeppelinClient = ZeppelinClient.initialize(zeppelinUri, token, conf);
+ }
+
+ public void start() {
+ if (zeppelinhubClient != null) {
+ zeppelinhubClient.start();
+ }
+ if (zeppelinClient != null) {
+ zeppelinClient.start();
+ }
+ }
+
+ public void stop() {
+ if (zeppelinhubClient != null) {
+ zeppelinhubClient.stop();
+ }
+ if (zeppelinClient != null) {
+ zeppelinClient.stop();
+ }
+ }
+
+ public void relayToZeppelinHub(String message) {
+ zeppelinhubClient.send(message);
+ }
+
+ public void relayToZeppelin(Message message, String noteId) {
+ zeppelinClient.send(message, noteId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
new file mode 100644
index 0000000..8ac6ebe
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHeartbeat;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+
+/**
+ * Zeppelin websocket client.
+ *
+ */
+public class ZeppelinClient {
+ private static final Logger LOG = LoggerFactory.getLogger(ZeppelinClient.class);
+ private final URI zeppelinWebsocketUrl;
+ private final String zeppelinhubToken;
+ private final WebSocketClient wsClient;
+ private static Gson gson;
+ private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
+ private static ZeppelinClient instance = null;
+ private SchedulerService schedulerService;
+ private Authentication authModule;
+ private static final int min = 60;
+
+ public static ZeppelinClient initialize(String zeppelinUrl, String token,
+ ZeppelinConfiguration conf) {
+ if (instance == null) {
+ instance = new ZeppelinClient(zeppelinUrl, token, conf);
+ }
+ return instance;
+ }
+
+ public static ZeppelinClient getInstance() {
+ return instance;
+ }
+
+ private ZeppelinClient(String zeppelinUrl, String token, ZeppelinConfiguration conf) {
+ zeppelinWebsocketUrl = URI.create(zeppelinUrl);
+ zeppelinhubToken = token;
+ wsClient = createNewWebsocketClient();
+ gson = new Gson();
+ zeppelinConnectionMap = new ConcurrentHashMap<>();
+ schedulerService = SchedulerService.getInstance();
+ authModule = Authentication.initialize(token, conf);
+ if (authModule != null) {
+ SchedulerService.getInstance().addOnce(authModule, 10);
+ }
+ LOG.info("Initialized Zeppelin websocket client on {}", zeppelinWebsocketUrl);
+ }
+
+ private WebSocketClient createNewWebsocketClient() {
+ SslContextFactory sslContextFactory = new SslContextFactory();
+ WebSocketClient client = new WebSocketClient(sslContextFactory);
+ client.setMaxIdleTimeout(5 * min * 1000);
+ //TODO(khalid): other client settings
+ return client;
+ }
+
+ public void start() {
+ try {
+ if (wsClient != null) {
+ wsClient.start();
+ addRoutines();
+ } else {
+ LOG.warn("Cannot start zeppelin websocket client - isn't initialized");
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot start Zeppelin websocket client", e);
+ }
+ }
+
+ private void addRoutines() {
+ schedulerService.add(ZeppelinHeartbeat.newInstance(this), 15, 4 * min);
+ }
+
+ public void stop() {
+ try {
+ if (wsClient != null) {
+ removeAllZeppelinConnections();
+ wsClient.stop();
+ } else {
+ LOG.warn("Cannot stop zeppelin websocket client - isn't initialized");
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot stop Zeppelin websocket client", e);
+ }
+ }
+
+ public String serialize(Message zeppelinMsg) {
+ if (credentialsAvailable()) {
+ zeppelinMsg.principal = authModule.getPrincipal();
+ zeppelinMsg.ticket = authModule.getTicket();
+ zeppelinMsg.roles = authModule.getRoles();
+ }
+ String msg = gson.toJson(zeppelinMsg);
+ return msg;
+ }
+
+ private boolean credentialsAvailable() {
+ return Authentication.getInstance() != null && Authentication.getInstance().isAuthenticated();
+ }
+
+ public Message deserialize(String zeppelinMessage) {
+ if (StringUtils.isBlank(zeppelinMessage)) {
+ return null;
+ }
+ Message msg;
+ try {
+ msg = gson.fromJson(zeppelinMessage, Message.class);
+ } catch (JsonSyntaxException ex) {
+ LOG.error("Cannot deserialize zeppelin message", ex);
+ msg = null;
+ }
+ return msg;
+ }
+
+ public void send(Message msg, String noteId) {
+ Session noteSession = getZeppelinConnection(noteId);
+ if (!isSessionOpen(noteSession)) {
+ LOG.error("Cannot open websocket connection to Zeppelin note {}", noteId);
+ return;
+ }
+ noteSession.getRemote().sendStringByFuture(serialize(msg));
+ }
+
+ private boolean isSessionOpen(Session session) {
+ return (session != null) && (session.isOpen());
+ }
+
+ /* per notebook based ws connection, returns null if can't connect */
+ public Session getZeppelinConnection(String noteId) {
+ if (StringUtils.isBlank(noteId)) {
+ LOG.warn("Cannot return websocket connection for blank noteId");
+ return null;
+ }
+
+ if (zeppelinConnectionMap.containsKey(noteId)) {
+ LOG.info("Connection for {} exists in map", noteId);
+ return getNoteSession(noteId);
+ }
+ //TODO(khalid): clean log later
+ LOG.info("Creating Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
+ return openNoteSession(noteId);
+ }
+
+ private Message zeppelinGetNoteMsg(String noteId) {
+ Message getNoteMsg = new Message(Message.OP.GET_NOTE);
+ HashMap<String, Object> data = new HashMap<String, Object>();
+ data.put("id", noteId);
+ getNoteMsg.data = data;
+ return getNoteMsg;
+ }
+
+ private Session getNoteSession(String noteId) {
+ Session session = zeppelinConnectionMap.get(noteId);
+ if (session == null || !session.isOpen()) {
+ LOG.info("Not connection to {}", noteId);
+ zeppelinConnectionMap.remove(noteId);
+ session = openNoteSession(noteId);
+ }
+ return session;
+ }
+
+ private Session openNoteSession(String noteId) {
+ ClientUpgradeRequest request = new ClientUpgradeRequest();
+ ZeppelinWebsocket socket = new ZeppelinWebsocket(noteId);
+ Future<Session> future = null;
+ Session session = null;
+ try {
+ future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
+ session = future.get();
+ } catch (IOException | InterruptedException | ExecutionException e) {
+ LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
+ return session;
+ }
+
+ if (zeppelinConnectionMap.containsKey(noteId)) {
+ session.close();
+ session = zeppelinConnectionMap.get(noteId);
+ } else {
+ String getNote = serialize(zeppelinGetNoteMsg(noteId));
+ // TODO(khalid): may need to check return whether successful
+ session.getRemote().sendStringByFuture(getNote);
+ zeppelinConnectionMap.put(noteId, session);
+ }
+ return session;
+ }
+
+ public void handleMsgFromZeppelin(String message, String noteId) {
+ Map<String, String> meta = new HashMap<String, String>();
+ meta.put("token", zeppelinhubToken);
+ meta.put("noteId", noteId);
+ Message zeppelinMsg = deserialize(message);
+ if (zeppelinMsg == null) {
+ return;
+ }
+ ZeppelinhubMessage hubMsg = ZeppelinhubMessage.newMessage(zeppelinMsg, meta);
+ Client client = Client.getInstance();
+ if (client == null) {
+ LOG.warn("Client isn't initialized yet");
+ return;
+ }
+ client.relayToZeppelinHub(hubMsg.serialize());
+ }
+
+ /**
+ * Close and remove ZeppelinConnection
+ */
+ public void removeZeppelinConnection(String noteId) {
+ if (zeppelinConnectionMap.containsKey(noteId)) {
+ Session conn = zeppelinConnectionMap.get(noteId);
+ if (conn.isOpen()) {
+ conn.close();
+ }
+ zeppelinConnectionMap.remove(noteId);
+ }
+ // TODO(khalid): clean log later
+ LOG.info("Removed Zeppelin ws connection for the following note {}", noteId);
+ }
+
+ /**
+ * Close and remove all ZeppelinConnection
+ */
+ public void removeAllZeppelinConnections() {
+ for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
+ if (isSessionOpen(entry.getValue())) {
+ entry.getValue().close();
+ }
+ zeppelinConnectionMap.remove(entry.getKey());
+ }
+ LOG.info("Removed all Zeppelin ws connections");
+ }
+
+ public void pingAllNotes() {
+ for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
+ if (isSessionOpen(entry.getValue())) {
+ send(new Message(OP.PING), entry.getKey());
+ } else {
+ // for cleanup
+ zeppelinConnectionMap.remove(entry.getKey());
+ }
+ }
+ }
+
+ public int countConnectedNotes() {
+ return zeppelinConnectionMap.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java
new file mode 100644
index 0000000..9bcd035
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
+
+
+import java.io.IOException;
+import java.net.HttpCookie;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinhubWebsocket;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.session.ZeppelinhubSession;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.util.json.JSONArray;
+import com.amazonaws.util.json.JSONException;
+import com.amazonaws.util.json.JSONObject;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * Manage a zeppelinhub websocket connection.
+ */
+public class ZeppelinhubClient {
+ private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubClient.class);
+
+ private final WebSocketClient client;
+ private final URI zeppelinhubWebsocketUrl;
+ private final ClientUpgradeRequest conectionRequest;
+ private final String zeppelinhubToken;
+
+ private static final int MB = 1048576;
+ private static final int MAXIMUN_TEXT_SIZE = 64 * MB;
+ private static final long CONNECTION_IDLE_TIME = TimeUnit.SECONDS.toMillis(30);
+ private static ZeppelinhubClient instance = null;
+ private static Gson gson;
+
+ private SchedulerService schedulerService;
+ private ZeppelinhubSession zeppelinhubSession;
+
+ public static ZeppelinhubClient initialize(String zeppelinhubUrl, String token) {
+ if (instance == null) {
+ instance = new ZeppelinhubClient(zeppelinhubUrl, token);
+ }
+ return instance;
+ }
+
+ public static ZeppelinhubClient getInstance() {
+ return instance;
+ }
+
+ private ZeppelinhubClient(String url, String token) {
+ zeppelinhubWebsocketUrl = URI.create(url);
+ client = createNewWebsocketClient();
+ conectionRequest = setConnectionrequest(token);
+ zeppelinhubToken = token;
+ schedulerService = SchedulerService.create(10);
+ gson = new Gson();
+ LOG.info("Initialized ZeppelinHub websocket client on {}", zeppelinhubWebsocketUrl);
+ }
+
+ public void start() {
+ try {
+ client.start();
+ zeppelinhubSession = connect();
+ addRoutines();
+ } catch (Exception e) {
+ LOG.error("Cannot connect to zeppelinhub via websocket", e);
+ }
+ }
+
+ public void stop() {
+ LOG.info("Stopping Zeppelinhub websocket client");
+ try {
+ zeppelinhubSession.close();
+ schedulerService.close();
+ client.stop();
+ } catch (Exception e) {
+ LOG.error("Cannot stop zeppelinhub websocket client", e);
+ }
+ }
+
+ public String getToken() {
+ return this.zeppelinhubToken;
+ }
+
+ public void send(String msg) {
+ if (!isConnectedToZeppelinhub()) {
+ LOG.info("Zeppelinhub connection is not open, opening it");
+ zeppelinhubSession = connect();
+ if (zeppelinhubSession == ZeppelinhubSession.EMPTY) {
+ LOG.warn("While connecting to ZeppelinHub received empty session, cannot send the message");
+ return;
+ }
+ }
+ zeppelinhubSession.sendByFuture(msg);
+ }
+
+ private boolean isConnectedToZeppelinhub() {
+ return (zeppelinhubSession != null && zeppelinhubSession.isSessionOpen());
+ }
+
+ private ZeppelinhubSession connect() {
+ ZeppelinhubSession zeppelinSession;
+ try {
+ ZeppelinhubWebsocket ws = ZeppelinhubWebsocket.newInstance(zeppelinhubToken);
+ Future<Session> future = client.connect(ws, zeppelinhubWebsocketUrl, conectionRequest);
+ Session session = future.get();
+ zeppelinSession = ZeppelinhubSession.createInstance(session, zeppelinhubToken);
+ } catch (IOException | InterruptedException | ExecutionException e) {
+ LOG.info("Couldnt connect to zeppelinhub", e);
+ zeppelinSession = ZeppelinhubSession.EMPTY;
+ }
+ return zeppelinSession;
+ }
+
+ private ClientUpgradeRequest setConnectionrequest(String token) {
+ ClientUpgradeRequest request = new ClientUpgradeRequest();
+ request.setCookies(Lists.newArrayList(new HttpCookie(ZeppelinHubRepo.TOKEN_HEADER, token)));
+ return request;
+ }
+
+ private WebSocketClient createNewWebsocketClient() {
+ SslContextFactory sslContextFactory = new SslContextFactory();
+ WebSocketClient client = new WebSocketClient(sslContextFactory);
+ client.setMaxTextMessageBufferSize(MAXIMUN_TEXT_SIZE);
+ client.setMaxIdleTimeout(CONNECTION_IDLE_TIME);
+ return client;
+ }
+
+ private void addRoutines() {
+ schedulerService.add(ZeppelinHubHeartbeat.newInstance(this), 10, 23);
+ }
+
+ public void handleMsgFromZeppelinHub(String message) {
+ ZeppelinhubMessage hubMsg = ZeppelinhubMessage.deserialize(message);
+ if (hubMsg.equals(ZeppelinhubMessage.EMPTY)) {
+ LOG.error("Cannot handle ZeppelinHub message is empty");
+ return;
+ }
+ String op = StringUtils.EMPTY;
+ if (hubMsg.op instanceof String) {
+ op = (String) hubMsg.op;
+ } else {
+ LOG.error("Message OP from ZeppelinHub isn't string {}", hubMsg.op);
+ return;
+ }
+ if (ZeppelinhubUtils.isZeppelinHubOp(op)) {
+ handleZeppelinHubOpMsg(ZeppelinhubUtils.toZeppelinHubOp(op), hubMsg, message);
+ } else if (ZeppelinhubUtils.isZeppelinOp(op)) {
+ forwardToZeppelin(ZeppelinhubUtils.toZeppelinOp(op), hubMsg);
+ }
+ }
+
+ private void handleZeppelinHubOpMsg(ZeppelinHubOp op, ZeppelinhubMessage hubMsg, String msg) {
+ if (op == null || msg.equals(ZeppelinhubMessage.EMPTY)) {
+ LOG.error("Cannot handle empty op or msg");
+ return;
+ }
+ switch (op) {
+ case RUN_NOTEBOOK:
+ runAllParagraph(hubMsg.meta.get("noteId"), msg);
+ break;
+ default:
+ LOG.warn("Received {} from ZeppelinHub, not handled", op);
+ break;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void forwardToZeppelin(Message.OP op, ZeppelinhubMessage hubMsg) {
+ Message zeppelinMsg = new Message(op);
+ if (!(hubMsg.data instanceof Map)) {
+ LOG.error("Data field of message from ZeppelinHub isn't in correct Map format");
+ return;
+ }
+ zeppelinMsg.data = (Map<String, Object>) hubMsg.data;
+ Client client = Client.getInstance();
+ if (client == null) {
+ LOG.warn("Base client isn't initialized, returning");
+ return;
+ }
+ client.relayToZeppelin(zeppelinMsg, hubMsg.meta.get("noteId"));
+ }
+
+ boolean runAllParagraph(String noteId, String hubMsg) {
+ LOG.info("Running paragraph with noteId {}", noteId);
+ try {
+ JSONObject data = new JSONObject(hubMsg);
+ if (data.equals(JSONObject.NULL) || !(data.get("data") instanceof JSONArray)) {
+ LOG.error("Wrong \"data\" format for RUN_NOTEBOOK");
+ return false;
+ }
+ Client client = Client.getInstance();
+ if (client == null) {
+ LOG.warn("Base client isn't initialized, returning");
+ return false;
+ }
+ Message zeppelinMsg = new Message(OP.RUN_PARAGRAPH);
+
+ JSONArray paragraphs = data.getJSONArray("data");
+ for (int i = 0; i < paragraphs.length(); i++) {
+ if (!(paragraphs.get(i) instanceof JSONObject)) {
+ LOG.warn("Wrong \"paragraph\" format for RUN_NOTEBOOK");
+ continue;
+ }
+ zeppelinMsg.data = gson.fromJson(paragraphs.getString(i),
+ new TypeToken<Map<String, Object>>(){}.getType());
+ client.relayToZeppelin(zeppelinMsg, noteId);
+ LOG.info("\nSending RUN_PARAGRAPH message to Zeppelin ");
+ }
+ } catch (JSONException e) {
+ LOG.error("Failed to parse RUN_NOTEBOOK message from ZeppelinHub ", e);
+ return false;
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
new file mode 100644
index 0000000..facfcab
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
+
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinClient;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Zeppelin websocket listener class.
+ *
+ */
+public class ZeppelinWebsocket implements WebSocketListener {
+ private static final Logger LOG = LoggerFactory.getLogger(ZeppelinWebsocket.class);
+ public Session connection;
+ public String noteId;
+
+ public ZeppelinWebsocket(String noteId) {
+ this.noteId = noteId;
+ }
+
+ @Override
+ public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
+
+ }
+
+ @Override
+ public void onWebSocketClose(int code, String message) {
+ LOG.info("Zeppelin connection closed with code: {}, message: {}", code, message);
+ // parentClient.removeConnMap(noteId);
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session) {
+ LOG.info("Zeppelin connection opened");
+ this.connection = session;
+ }
+
+ @Override
+ public void onWebSocketError(Throwable e) {
+ LOG.warn("Zeppelin socket connection error ", e);
+ }
+
+ @Override
+ public void onWebSocketText(String data) {
+ LOG.debug("Zeppelin client received Message: " + data);
+ // propagate to ZeppelinHub
+ try {
+ ZeppelinClient zeppelinClient = ZeppelinClient.getInstance();
+ if (zeppelinClient != null) {
+ zeppelinClient.handleMsgFromZeppelin(data, noteId);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to send message to ZeppelinHub: ", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java
new file mode 100644
index 0000000..e28054a
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
+
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Zeppelinhub websocket handler.
+ */
+public class ZeppelinhubWebsocket implements WebSocketListener {
+ private Logger LOG = LoggerFactory.getLogger(ZeppelinhubWebsocket.class);
+ private Session zeppelinHubSession;
+ private final String token;
+
+ private ZeppelinhubWebsocket(String token) {
+ this.token = token;
+ }
+
+ public static ZeppelinhubWebsocket newInstance(String token) {
+ return new ZeppelinhubWebsocket(token);
+ }
+
+ @Override
+ public void onWebSocketBinary(byte[] payload, int offset, int len) {}
+
+ @Override
+ public void onWebSocketClose(int statusCode, String reason) {
+ LOG.info("Closing websocket connection [{}] : {}", statusCode, reason);
+ send(ZeppelinhubUtils.deadMessage(token));
+ this.zeppelinHubSession = null;
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session) {
+ LOG.info("Opening a new session to Zeppelinhub {}", session.hashCode());
+ this.zeppelinHubSession = session;
+ send(ZeppelinhubUtils.liveMessage(token));
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ LOG.error("Got error", cause);
+ }
+
+ @Override
+ public void onWebSocketText(String message) {
+ // handle message from ZeppelinHub.
+ ZeppelinhubClient client = ZeppelinhubClient.getInstance();
+ if (client != null) {
+ client.handleMsgFromZeppelinHub(message);
+ }
+ }
+
+ private boolean isSessionOpen() {
+ return ((zeppelinHubSession != null) && (zeppelinHubSession.isOpen())) ? true : false;
+ }
+
+ private void send(String msg) {
+ if (isSessionOpen()) {
+ zeppelinHubSession.getRemote().sendStringByFuture(msg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java
new file mode 100644
index 0000000..80d5f06
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol;
+
+/**
+ * Zeppelinhub Op.
+ */
+public enum ZeppelinHubOp {
+ LIVE,
+ DEAD,
+ PING,
+ PONG,
+ RUN_NOTEBOOK,
+ WELCOME,
+ ZEPPELIN_STATUS
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java
new file mode 100644
index 0000000..aa1ce21
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+
+/**
+ * Zeppelinhub message class.
+ *
+ */
+public class ZeppelinhubMessage {
+ private static final Gson gson = new Gson();
+ private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+ public static final ZeppelinhubMessage EMPTY = new ZeppelinhubMessage();
+
+ public Object op;
+ public Object data;
+ public Map<String, String> meta = Maps.newHashMap();
+
+ private ZeppelinhubMessage() {
+ this.op = OP.LIST_NOTES;
+ this.data = null;
+ }
+
+ private ZeppelinhubMessage(Object op, Object data, Map<String, String> meta) {
+ this.op = op;
+ this.data = data;
+ this.meta = meta;
+ }
+
+ public static ZeppelinhubMessage newMessage(Object op, Object data, Map<String, String> meta) {
+ return new ZeppelinhubMessage(op, data, meta);
+ }
+
+ public static ZeppelinhubMessage newMessage(Message zeppelinMsg, Map<String, String> meta) {
+ if (zeppelinMsg == null) {
+ return EMPTY;
+ }
+ return new ZeppelinhubMessage(zeppelinMsg.op, zeppelinMsg.data, meta);
+ }
+
+ public String serialize() {
+ return gson.toJson(this, ZeppelinhubMessage.class);
+ }
+
+ public static ZeppelinhubMessage deserialize(String zeppelinhubMessage) {
+ if (StringUtils.isBlank(zeppelinhubMessage)) {
+ return EMPTY;
+ }
+ ZeppelinhubMessage msg;
+ try {
+ msg = gson.fromJson(zeppelinhubMessage, ZeppelinhubMessage.class);
+ } catch (JsonSyntaxException ex) {
+ LOG.error("Cannot deserialize zeppelinhub message", ex);
+ msg = EMPTY;
+ }
+ return msg;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java
new file mode 100644
index 0000000..024a3c0
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Creates a thread pool that can schedule zeppelinhub commands.
+ *
+ */
+public class SchedulerService {
+
+ private final ScheduledExecutorService pool;
+ private static SchedulerService instance = null;
+
+ private SchedulerService(int numberOfThread) {
+ pool = Executors.newScheduledThreadPool(numberOfThread);
+ }
+
+ public static SchedulerService create(int numberOfThread) {
+ if (instance == null) {
+ instance = new SchedulerService(numberOfThread);
+ }
+ return instance;
+ }
+
+ public static SchedulerService getInstance() {
+ if (instance == null) {
+ instance = new SchedulerService(2);
+ }
+ return instance;
+ }
+
+ public void add(Runnable service, int firstExecution, int period) {
+ pool.scheduleAtFixedRate(service, firstExecution, period, TimeUnit.SECONDS);
+ }
+
+ public void addOnce(Runnable service, int firstExecution) {
+ pool.schedule(service, firstExecution, TimeUnit.SECONDS);
+ }
+
+ public void close() {
+ pool.shutdown();
+ }
+
+}