You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by an...@apache.org on 2016/05/28 19:13:47 UTC

[2/2] incubator-zeppelin git commit: ZeppelinHub notebook storage/connection repository

ZeppelinHub notebook storage/connection repository

### What is this PR for?
This is to add [ZeppelinHub](https://www.zeppelinhub.com) notebook storage/connection layer to the Zeppelin.

### What type of PR is it?
Feature

### Todos
* [x] - NotebookRepo rest api
* [x] - ZeppelinHub websocket client
* [x] - Zeppelin websocket client
* [x] - Tests
* [x] - More QA (authentication consistency, etc.)
* [x] - Address review comments

### What is the Jira issue?

### How should this be tested?
First of all, you may need to create account in [ZeppelinHub](https://www.zeppelinhub.com).
Then you can set connection by following guides in [here](https://github.com/khalidhuseynov/incubator-zeppelin/blob/feat/zeppelinhub-storage/docs/storage/storage.md#notebook-storage-in-zeppelinhub--).
Finally you should be able to access and manipulate your notebooks from inside of your ZeppelinHub account.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes

Author: Khalid Huseynov <kh...@nflabs.com>
Author: Anthony Corbacho <co...@gmail.com>

Closes #880 from khalidhuseynov/feat/zeppelinhub-storage and squashes the following commits:

b480176 [Anthony Corbacho] Change debug log from info to error
db8d1e8 [Anthony Corbacho] Remove spave after comment
1c78d8c [Khalid Huseynov] address @bzz comments
b1fe8a3 [Khalid Huseynov] move jetty.version property to root
acf58d5 [Khalid Huseynov] address @AhyoungRyu comments
8de3d1f [Khalid Huseynov] add zeppelin  hearbeat
6c3aff2 [Khalid Huseynov] remove 074098eeb8ace6545c159d26657768079ae4b208 and ZeppelinHubConnection
efcfca3 [Khalid Huseynov] more docs
28f6bf7 [Khalid Huseynov] add authentication to zeppelin
1ca3d65 [Khalid Huseynov] scheduler service as singleton
32497dc [Anthony Corbacho] When user run a notebook, check if the websocket session exist, open it if close
36176ea [Anthony Corbacho] change log info to debug
8ad482d [Anthony Corbacho] Add routine to check ws connection to zeppelinhub is open, if not open it (keep trying)
27a4042 [Anthony Corbacho] rename fct hub -> ZeppelinHub
4c47e43 [Anthony Corbacho] code cleanup & walkthrough
636b9fa [Khalid Huseynov] fix tests
4b14a98 [Anthony Corbacho] add ASF headers
4ac8115 [Khalid Huseynov] add config info and docs
18ee802 [Khalid Huseynov] improve  repo tests
2d27ec6 [Khalid Huseynov] add zeppelinhub ws client test
ebcf692 [Khalid Huseynov] add zeppelin client tests
9f1b8bf [Khalid Huseynov] add zeppelin websocket client
1ce01ef [Khalid Huseynov] add rest package
45bec47 [Anthony Corbacho] add zeppelinhub websocket client
f8f168a [Anthony Corbacho] Add zeppelinhubmessage class + tests
a1c5f52 [Anthony Corbacho] prerequisite changes for starting websocket client
45ed9e1 [Khalid Huseynov] initial rest api


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/8cde5c9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/8cde5c9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/8cde5c9b

Branch: refs/heads/master
Commit: 8cde5c9bd47e3e9682d016274a808e096ab34d0b
Parents: 2154476
Author: Khalid Huseynov <kh...@nflabs.com>
Authored: Sat May 28 11:43:07 2016 -0700
Committer: Anthony Corbacho <co...@gmail.com>
Committed: Sat May 28 12:13:11 2016 -0700

----------------------------------------------------------------------
 conf/zeppelin-env.cmd.template                  |   6 +
 conf/zeppelin-env.sh.template                   |   8 +-
 conf/zeppelin-site.xml.template                 |  11 +-
 docs/_includes/themes/zeppelin/_navigation.html |   1 +
 docs/storage/storage.md                         |  31 ++
 pom.xml                                         |   1 +
 zeppelin-server/pom.xml                         |   1 -
 .../org/apache/zeppelin/socket/Message.java     | 151 ----------
 .../apache/zeppelin/socket/NotebookServer.java  |   3 +-
 .../zeppelin/socket/NotebookServerTest.java     |   3 +-
 zeppelin-zengine/pom.xml                        |  39 +++
 .../repo/zeppelinhub/ZeppelinHubRepo.java       | 195 +++++++++++++
 .../rest/ZeppelinhubRestApiHandler.java         | 210 ++++++++++++++
 .../zeppelinhub/security/Authentication.java    | 228 +++++++++++++++
 .../repo/zeppelinhub/websocket/Client.java      |  79 +++++
 .../zeppelinhub/websocket/ZeppelinClient.java   | 286 +++++++++++++++++++
 .../websocket/ZeppelinhubClient.java            | 251 ++++++++++++++++
 .../websocket/listener/ZeppelinWebsocket.java   |  74 +++++
 .../listener/ZeppelinhubWebsocket.java          |  82 ++++++
 .../websocket/protocol/ZeppelinHubOp.java       |  30 ++
 .../websocket/protocol/ZeppelinhubMessage.java  |  85 ++++++
 .../websocket/scheduler/SchedulerService.java   |  62 ++++
 .../websocket/scheduler/ZeppelinHeartbeat.java  |  44 +++
 .../scheduler/ZeppelinHubHeartbeat.java         |  45 +++
 .../websocket/session/ZeppelinhubSession.java   |  63 ++++
 .../websocket/utils/ZeppelinhubUtils.java       |  98 +++++++
 .../zeppelin/notebook/socket/Message.java       | 151 ++++++++++
 .../repo/zeppelinhub/ZeppelinHubRepoTest.java   | 150 ++++++++++
 .../websocket/ZeppelinClientTest.java           | 123 ++++++++
 .../websocket/ZeppelinhubClientTest.java        |  72 +++++
 .../websocket/mock/MockEchoWebsocketServer.java |  46 +++
 .../websocket/mock/MockEventServlet.java        |  14 +
 .../websocket/mock/MockEventSocket.java         |  38 +++
 .../protocol/ZeppelinhubMessageTest.java        |  43 +++
 .../src/test/resources/list_of_notes            |  41 +++
 zeppelin-zengine/src/test/resources/note        |  16 ++
 36 files changed, 2625 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/conf/zeppelin-env.cmd.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-env.cmd.template b/conf/zeppelin-env.cmd.template
index 06799b5..59953dc 100644
--- a/conf/zeppelin-env.cmd.template
+++ b/conf/zeppelin-env.cmd.template
@@ -34,6 +34,7 @@ REM set ZEPPELIN_NOTEBOOK_S3_USER              REM User in bucket where notebook
 REM set ZEPPELIN_IDENT_STRING   		REM A string representing this instance of zeppelin. $USER by default.
 REM set ZEPPELIN_NICENESS       		REM The scheduling priority for daemons. Defaults to 0.
 REM set ZEPPELIN_INTERPRETER_LOCALREPO         REM Local repository for interpreter's additional dependency loading
+REM set ZEPPELIN_NOTEBOOK_STORAGE		REM Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
 
 
 REM Spark interpreter configuration
@@ -62,3 +63,8 @@ REM set ZEPPELIN_SPARK_USEHIVECONTEXT  REM Use HiveContext instead of SQLContext
 REM set ZEPPELIN_SPARK_CONCURRENTSQL   REM Execute multiple SQL concurrently if set true. false by default.
 REM set ZEPPELIN_SPARK_MAXRESULT       REM Max number of SparkSQL result to display. 1000 by default.
 
+REM ZeppelinHub connection configuration
+REM
+REM set ZEPPELINHUB_API_ADDRESS	       REM Refers to the address of the ZeppelinHub service in use
+REM set ZEPPELINHUB_API_TOKEN          REM Refers to the Zeppelin instance token of the user
+REM set ZEPPELINHUB_USER_KEY           REM Optional, when using Zeppelin with authentication.

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/conf/zeppelin-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template
index 6279de7..be6f3dd 100644
--- a/conf/zeppelin-env.sh.template
+++ b/conf/zeppelin-env.sh.template
@@ -35,7 +35,7 @@
 # export ZEPPELIN_IDENT_STRING   		# A string representing this instance of zeppelin. $USER by default.
 # export ZEPPELIN_NICENESS       		# The scheduling priority for daemons. Defaults to 0.
 # export ZEPPELIN_INTERPRETER_LOCALREPO         # Local repository for interpreter's additional dependency loading
-
+# export ZEPPELIN_NOTEBOOK_STORAGE 		# Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
 
 #### Spark interpreter configuration ####
 
@@ -70,3 +70,9 @@
 
 # export HBASE_HOME=                    # (require) Under which HBase scripts and configuration should be
 # export HBASE_CONF_DIR=                # (optional) Alternatively, configuration directory can be set to point to the directory that has hbase-site.xml
+
+#### ZeppelinHub connection configuration ####
+# export ZEPPELINHUB_API_ADDRESS		# Refers to the address of the ZeppelinHub service in use
+# export ZEPPELINHUB_API_TOKEN			# Refers to the Zeppelin instance token of the user
+# export ZEPPELINHUB_USER_KEY			# Optional, when using Zeppelin with authentication.
+

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index f7a5405..bd61153 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -141,7 +141,7 @@
 </property>
 -->
 
-<!-- For versioning your local norebook storage using Git repository
+<!-- For versioning your local notebook storage using Git repository
 <property>
   <name>zeppelin.notebook.storage</name>
   <value>org.apache.zeppelin.notebook.repo.GitNotebookRepo</value>
@@ -149,6 +149,15 @@
 </property>
 -->
 
+<!-- For connecting your Zeppelin with ZeppelinHub -->
+<!--
+<property>
+  <name>zeppelin.notebook.storage</name>
+  <value>org.apache.zeppelin.notebook.repo.VFSNotebookRepo, org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo</value>
+  <description>two notebook persistence layers (local + ZeppelinHub)</description>
+</property>
+-->
+
 <property>
   <name>zeppelin.notebook.storage</name>
   <value>org.apache.zeppelin.notebook.repo.VFSNotebookRepo</value>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/docs/_includes/themes/zeppelin/_navigation.html
----------------------------------------------------------------------
diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html
index 10d7be0..8a46b74 100644
--- a/docs/_includes/themes/zeppelin/_navigation.html
+++ b/docs/_includes/themes/zeppelin/_navigation.html
@@ -87,6 +87,7 @@
                 <li><a href="{{BASE_PATH}}/storage/storage.html#Git">Git Storage</a></li>
                 <li><a href="{{BASE_PATH}}/storage/storage.html#S3">S3 Storage</a></li>
                 <li><a href="{{BASE_PATH}}/storage/storage.html#Azure">Azure Storage</a></li>
+                <li><a href="{{BASE_PATH}}/storage/storage.html#ZeppelinHub">ZeppelinHub Storage</a></li>
                 <li role="separator" class="divider"></li>
                 <!-- li><span><b>REST API</b><span></li -->
                 <li><a href="{{BASE_PATH}}/rest-api/rest-interpreter.html">Interpreter API</a></li>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/docs/storage/storage.md
----------------------------------------------------------------------
diff --git a/docs/storage/storage.md b/docs/storage/storage.md
index ea534bf..25c62bb 100644
--- a/docs/storage/storage.md
+++ b/docs/storage/storage.md
@@ -201,3 +201,34 @@ Optionally, you can specify Azure folder structure name in the file **zeppelin-s
   <description>optional user name for Azure folder structure</description>
 </property>
 ```
+
+</br>
+#### Notebook Storage in ZeppelinHub  <a name="ZeppelinHub"></a>
+
+ZeppelinHub storage layer allows out of the box connection of Zeppelin instance with your ZeppelinHub account. First of all, you need to either comment out the following  property in **zeppelin-site.xml**:
+
+```
+<!-- For connecting your Zeppelin with ZeppelinHub -->
+<!--
+<property>
+  <name>zeppelin.notebook.storage</name>
+  <value>org.apache.zeppelin.notebook.repo.VFSNotebookRepo, org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo</value>
+  <description>two notebook persistence layers (local + ZeppelinHub)</description>
+</property>
+-->
+```
+
+or set the environment variable in the file **zeppelin-env.sh**:
+
+```
+export ZEPPELIN_NOTEBOOK_STORAGE="org.apache.zeppelin.notebook.repo.VFSNotebookRepo, org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo"
+```
+
+Secondly, you need to set the environment variables in the file **zeppelin-env.sh**:
+
+```
+export ZEPPELINHUB_API_TOKEN = ZeppelinHub token
+export ZEPPELINHUB_API_ADDRESS = address of ZeppelinHub service (e.g. https://www.zeppelinhub.com)
+```
+
+You can get more information on generating `token` and using authentication on the corresponding [help page](http://help.zeppelinhub.com/zeppelin_integration/#add-a-new-zeppelin-instance-and-generate-a-token).

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 77f3667..28508e7 100755
--- a/pom.xml
+++ b/pom.xml
@@ -118,6 +118,7 @@
     <libthrift.version>0.9.2</libthrift.version>
     <gson.version>2.2</gson.version>
     <guava.version>15.0</guava.version>
+    <jetty.version>9.2.15.v20160210</jetty.version>
 
     <PermGen>64m</PermGen>
     <MaxPermGen>512m</MaxPermGen>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-server/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml
index b1c0cef..19a95e4 100644
--- a/zeppelin-server/pom.xml
+++ b/zeppelin-server/pom.xml
@@ -35,7 +35,6 @@
 
   <properties>
     <cxf.version>2.7.7</cxf.version>
-    <jetty.version>9.2.15.v20160210</jetty.version>
     <commons.httpclient.version>4.3.6</commons.httpclient.version>
   </properties>
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
deleted file mode 100644
index 913e184..0000000
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.socket;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Zeppelin websocker massage template class.
- */
-public class Message {
-  /**
-   * Representation of event type.
-   */
-  public static enum OP {
-    GET_HOME_NOTE, // [c-s] load note for home screen
-
-    GET_NOTE, // [c-s] client load note
-              // @param id note id
-
-    NOTE, // [s-c] note info
-          // @param note serlialized Note object
-
-    PARAGRAPH, // [s-c] paragraph info
-               // @param paragraph serialized paragraph object
-
-    PROGRESS, // [s-c] progress update
-              // @param id paragraph id
-              // @param progress percentage progress
-
-    NEW_NOTE, // [c-s] create new notebook
-    DEL_NOTE, // [c-s] delete notebook
-              // @param id note id
-    CLONE_NOTE, // [c-s] clone new notebook
-                // @param id id of note to clone
-                // @param name name fpor the cloned note
-    IMPORT_NOTE,  // [c-s] import notebook
-                  // @param object notebook
-    NOTE_UPDATE,
-
-    RUN_PARAGRAPH, // [c-s] run paragraph
-                   // @param id paragraph id
-                  // @param paragraph paragraph content.ie. script
-                  // @param config paragraph config
-                  // @param params paragraph params
-
-    COMMIT_PARAGRAPH, // [c-s] commit paragraph
-                      // @param id paragraph id
-                      // @param title paragraph title
-                      // @param paragraph paragraph content.ie. script
-                      // @param config paragraph config
-                      // @param params paragraph params
-
-    CANCEL_PARAGRAPH, // [c-s] cancel paragraph run
-                      // @param id paragraph id
-
-    MOVE_PARAGRAPH, // [c-s] move paragraph order
-                    // @param id paragraph id
-                    // @param index index the paragraph want to go
-
-    INSERT_PARAGRAPH, // [c-s] create new paragraph below current paragraph
-                      // @param target index
-
-    COMPLETION, // [c-s] ask completion candidates
-                // @param id
-                // @param buf current code
-                // @param cursor cursor position in code
-
-    COMPLETION_LIST, // [s-c] send back completion candidates list
-                     // @param id
-                     // @param completions list of string
-
-    LIST_NOTES, // [c-s] ask list of note
-    RELOAD_NOTES_FROM_REPO, // [c-s] reload notes from repo
-
-    NOTES_INFO, // [s-c] list of note infos
-                // @param notes serialized List<NoteInfo> object
-
-    PARAGRAPH_REMOVE,
-    PARAGRAPH_CLEAR_OUTPUT,
-    PARAGRAPH_APPEND_OUTPUT,  // [s-c] append output
-    PARAGRAPH_UPDATE_OUTPUT,  // [s-c] update (replace) output
-    PING,
-    AUTH_INFO,
-
-    ANGULAR_OBJECT_UPDATE,  // [s-c] add/update angular object
-    ANGULAR_OBJECT_REMOVE,  // [s-c] add angular object del
-    
-    ANGULAR_OBJECT_UPDATED,  // [c-s] angular object value updated,
-
-    ANGULAR_OBJECT_CLIENT_BIND,  // [c-s] angular object updated from AngularJS z object
-
-    ANGULAR_OBJECT_CLIENT_UNBIND,  // [c-s] angular object unbind from AngularJS z object
-
-    LIST_CONFIGURATIONS, // [c-s] ask all key/value pairs of configurations
-    CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
-                  // @param settings serialized Map<String, String> object
-
-    CHECKPOINT_NOTEBOOK     // [c-s] checkpoint notebook to storage repository
-                            // @param noteId
-                            // @param checkpointName
-
-  }
-
-  public OP op;
-  public Map<String, Object> data = new HashMap<String, Object>();
-  public String ticket = "anonymous";
-  public String principal = "anonymous";
-  public String roles = "";
-
-  public Message(OP op) {
-    this.op = op;
-  }
-
-  public Message put(String k, Object v) {
-    data.put(k, v);
-    return this;
-  }
-
-  public Object get(String k) {
-    return data.get(k);
-  }
-
-  public <T> T getType(String key) {
-    return (T) data.get(key);
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder("Message{");
-    sb.append("data=").append(data);
-    sb.append(", op=").append(op);
-    sb.append('}');
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 05a0ae8..7d50809 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -42,10 +42,11 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.notebook.*;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.server.ZeppelinServer;
-import org.apache.zeppelin.socket.Message.OP;
 import org.apache.zeppelin.ticket.TicketContainer;
 import org.apache.zeppelin.utils.SecurityUtils;
 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index 4cf57e3..b904b68 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -30,9 +30,10 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
 import org.apache.zeppelin.rest.AbstractTestRestApi;
 import org.apache.zeppelin.server.ZeppelinServer;
-import org.apache.zeppelin.socket.Message.OP;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index 9be82ec..30e211d 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -128,6 +128,24 @@
     </dependency>
 
     <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-client</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.eclipse.jetty.websocket</groupId>
+      <artifactId>websocket-client</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty.websocket</groupId>
+      <artifactId>websocket-client</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.quartz-scheduler</groupId>
       <artifactId>quartz</artifactId>
       <version>2.2.1</version>
@@ -221,6 +239,27 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+      <version>${jetty.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+      <version>${jetty.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty.websocket</groupId>
+      <artifactId>websocket-server</artifactId>
+      <version>${jetty.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
new file mode 100644
index 0000000..3d344b4
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.notebook.repo.NotebookRepo;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * ZeppelinHub repo class.
+ */
+public class ZeppelinHubRepo implements NotebookRepo {
+  private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
+  private static final String DEFAULT_SERVER = "https://www.zeppelinhub.com";
+  static final String ZEPPELIN_CONF_PROP_NAME_SERVER = "zeppelinhub.api.address";
+  static final String ZEPPELIN_CONF_PROP_NAME_TOKEN = "zeppelinhub.api.token";
+  public static final String TOKEN_HEADER = "X-Zeppelin-Token";
+  private static final Gson GSON = new Gson();
+  private static final Note EMPTY_NOTE = new Note();
+  private final Client websocketClient;
+
+  private String token;
+  private ZeppelinhubRestApiHandler restApiClient;
+
+  public ZeppelinHubRepo(ZeppelinConfiguration conf) {
+    String zeppelinHubUrl = getZeppelinHubUrl(conf);
+    LOG.info("Initializing ZeppelinHub integration module");
+    token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
+    restApiClient = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
+
+    websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
+        getZeppelinhubWebsocketUri(conf), token, conf);
+    websocketClient.start();
+  }
+
+  private String getZeppelinHubWsUri(URI api) throws URISyntaxException {
+    URI apiRoot = api;
+    String scheme = apiRoot.getScheme();
+    int port = apiRoot.getPort();
+    if (port <= 0) {
+      port = (scheme != null && scheme.equals("https")) ? 443 : 80;
+    }
+
+    if (scheme == null) {
+      LOG.info("{} is not a valid zeppelinhub server address. proceed with default address {}",
+          apiRoot, DEFAULT_SERVER);
+      apiRoot = new URI(DEFAULT_SERVER);
+      scheme = apiRoot.getScheme();
+      port = apiRoot.getPort();
+      if (port <= 0) {
+        port = (scheme != null && scheme.equals("https")) ? 443 : 80;
+      }
+    }
+    String ws = scheme.equals("https") ? "wss://" : "ws://";
+    return ws + apiRoot.getHost() + ":" + port + "/async";
+  }
+
+  String getZeppelinhubWebsocketUri(ZeppelinConfiguration conf) {
+    String zeppelinHubUri = StringUtils.EMPTY;
+    try {
+      zeppelinHubUri = getZeppelinHubWsUri(new URI(conf.getString("ZEPPELINHUB_API_ADDRESS",
+          ZEPPELIN_CONF_PROP_NAME_SERVER, DEFAULT_SERVER)));
+    } catch (URISyntaxException e) {
+      LOG.error("Cannot get ZeppelinHub URI", e);
+    }
+    return zeppelinHubUri;
+  }
+
+  private String getZeppelinWebsocketUri(ZeppelinConfiguration conf) {
+    int port = conf.getServerPort();
+    if (port <= 0) {
+      port = 80;
+    }
+    String ws = conf.useSsl() ? "wss" : "ws";
+    return ws + "://localhost:" + port + "/ws";
+  }
+
+  // Used in tests
+  void setZeppelinhubRestApiHandler(ZeppelinhubRestApiHandler zeppelinhub) {
+    restApiClient = zeppelinhub;
+  }
+
+  String getZeppelinHubUrl(ZeppelinConfiguration conf) {
+    if (conf == null) {
+      LOG.error("Invalid configuration, cannot be null. Using default address {}", DEFAULT_SERVER);
+      return DEFAULT_SERVER;
+    }
+    URI apiRoot;
+    String zeppelinhubUrl;
+    try {
+      String url = conf.getString("ZEPPELINHUB_API_ADDRESS",
+                                  ZEPPELIN_CONF_PROP_NAME_SERVER,
+                                  DEFAULT_SERVER);
+      apiRoot = new URI(url);
+    } catch (URISyntaxException e) {
+      LOG.error("Invalid zeppelinhub url, using default address {}", DEFAULT_SERVER, e);
+      return DEFAULT_SERVER;
+    }
+
+    String scheme = apiRoot.getScheme();
+    if (scheme == null) {
+      LOG.info("{} is not a valid zeppelinhub server address. proceed with default address {}",
+               apiRoot, DEFAULT_SERVER);
+      zeppelinhubUrl = DEFAULT_SERVER;
+    } else {
+      zeppelinhubUrl = scheme + "://" + apiRoot.getHost();
+      if (apiRoot.getPort() > 0) {
+        zeppelinhubUrl += ":" + apiRoot.getPort();
+      }
+    }
+    return zeppelinhubUrl;
+  }
+
+  @Override
+  public List<NoteInfo> list() throws IOException {
+    String response = restApiClient.asyncGet("");
+    List<NoteInfo> notes = GSON.fromJson(response, new TypeToken<List<NoteInfo>>() {}.getType());
+    if (notes == null) {
+      return Collections.emptyList();
+    }
+    LOG.info("ZeppelinHub REST API listing notes ");
+    return notes;
+  }
+
+  @Override
+  public Note get(String noteId) throws IOException {
+    if (StringUtils.isBlank(noteId)) {
+      return EMPTY_NOTE;
+    }
+    //String response = zeppelinhubHandler.get(noteId);
+    String response = restApiClient.asyncGet(noteId);
+    Note note = GSON.fromJson(response, Note.class);
+    if (note == null) {
+      return EMPTY_NOTE;
+    }
+    LOG.info("ZeppelinHub REST API get note {} ", noteId);
+    return note;
+  }
+
+  @Override
+  public void save(Note note) throws IOException {
+    if (note == null) {
+      throw new IOException("Zeppelinhub failed to save empty note");
+    }
+    String notebook = GSON.toJson(note);
+    restApiClient.asyncPut(notebook);
+    LOG.info("ZeppelinHub REST API saving note {} ", note.id()); 
+  }
+
+  @Override
+  public void remove(String noteId) throws IOException {
+    restApiClient.asyncDel(noteId);
+    LOG.info("ZeppelinHub REST API removing note {} ", noteId);
+  }
+
+  @Override
+  public void close() {
+    websocketClient.stop();
+  }
+
+  @Override
+  public void checkpoint(String noteId, String checkPointName) throws IOException {
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
new file mode 100644
index 0000000..8f9b2e5
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.client.util.InputStreamResponseListener;
+import org.eclipse.jetty.client.util.StringContentProvider;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * REST API handler.
+ *
+ */
+public class ZeppelinhubRestApiHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
+  public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token";
+  private static final String DEFAULT_API_PATH = "/api/v1/zeppelin";
+  private static boolean PROXY_ON = false;
+  private static String PROXY_HOST;
+  private static int PROXY_PORT;
+
+  private final HttpClient client;
+  private final String zepelinhubUrl;
+  private final String token;
+
+  public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl,
+      String token) {
+    return new ZeppelinhubRestApiHandler(zeppelinhubUrl, token);
+  }
+
+  private ZeppelinhubRestApiHandler(String zeppelinhubUrl, String token) {
+    this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
+    this.token = token;
+
+    //TODO(khalid):to make proxy conf consistent with Zeppelin confs
+    //readProxyConf();
+    client = getAsyncClient();
+
+    try {
+      client.start();
+    } catch (Exception e) {
+      LOG.error("Cannot initialize ZeppelinHub REST async client", e);
+    }
+
+  }
+
+  private void readProxyConf() {
+    //try reading http_proxy
+    String proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ?
+        System.getenv("HTTP_PROXY") : System.getenv("http_proxy");
+    if (StringUtils.isBlank(proxyHostString)) {
+      //try https_proxy if no http_proxy
+      proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ?
+          System.getenv("HTTPS_PROXY") : System.getenv("https_proxy");
+    }
+
+    if (StringUtils.isBlank(proxyHostString)) {
+      PROXY_ON = false;
+    } else {
+      // host format - http://domain:port/
+      String[] parts = proxyHostString.replaceAll("/", "").split(":");
+      if (parts.length != 3) {
+        LOG.warn("Proxy host format is incorrect {}, e.g. http://domain:port/", proxyHostString);
+        PROXY_ON = false;
+        return;
+      }
+      PROXY_HOST = parts[1];
+      PROXY_PORT = Integer.parseInt(parts[2]);
+      LOG.info("Proxy protocol: {}, domain: {}, port: {}", parts[0], parts[1], parts[2]);
+      PROXY_ON = true;
+    }
+  }
+
+  private HttpClient getAsyncClient() {
+    SslContextFactory sslContextFactory = new SslContextFactory();
+    HttpClient httpClient = new HttpClient(sslContextFactory);
+
+    // Configure HttpClient
+    httpClient.setFollowRedirects(false);
+    httpClient.setMaxConnectionsPerDestination(100);
+    // Config considerations
+    //TODO(khalid): consider using proxy
+    //TODO(khalid): consider whether require to follow redirects
+    //TODO(khalid): consider multi-threaded connection manager case
+
+    return httpClient;
+  }
+
+  public String asyncGet(String argument) throws IOException {
+    String note = StringUtils.EMPTY;
+
+    InputStreamResponseListener listener = new InputStreamResponseListener();
+    client.newRequest(zepelinhubUrl + argument)
+          .header(ZEPPELIN_TOKEN_HEADER, token)
+          .send(listener);
+
+    // Wait for the response headers to arrive
+    Response response;
+    try {
+      response = listener.get(30, TimeUnit.SECONDS);
+    } catch (InterruptedException | TimeoutException | ExecutionException e) {
+      LOG.error("Cannot perform Get request to ZeppelinHub", e);
+      throw new IOException("Cannot load note from ZeppelinHub", e);
+    }
+
+    int code = response.getStatus();
+    if (code == 200) {
+      try (InputStream responseContent = listener.getInputStream()) {
+        note = IOUtils.toString(responseContent, "UTF-8");
+      }
+    } else {
+      LOG.error("ZeppelinHub Get {} returned with status {} ", zepelinhubUrl + argument, code);
+      throw new IOException("Cannot load note from ZeppelinHub");
+    }
+    return note;
+  }
+
+  public void asyncPut(String jsonNote) throws IOException {
+    if (StringUtils.isBlank(jsonNote)) {
+      LOG.error("Cannot save empty note/string to ZeppelinHub");
+      return;
+    }
+
+    client.newRequest(zepelinhubUrl).method(HttpMethod.PUT)
+        .header(ZEPPELIN_TOKEN_HEADER, token)
+        .content(new StringContentProvider(jsonNote, "UTF-8"), "application/json;charset=UTF-8")
+        .send(new BufferingResponseListener() {
+
+          @Override
+          public void onComplete(Result res) {
+            if (!res.isFailed() && res.getResponse().getStatus() == 200) {
+              LOG.info("Successfully saved note to ZeppelinHub with {}",
+                  res.getResponse().getStatus());
+            } else {
+              LOG.warn("Failed to save note to ZeppelinHub with HttpStatus {}",
+                  res.getResponse().getStatus());
+            }
+          }
+
+          @Override
+          public void onFailure(Response response, Throwable failure) {
+            LOG.error("Failed to save note to ZeppelinHub: {}", response.getReason(), failure);
+          }
+        });
+  }
+
+  public void asyncDel(String argument) {
+    if (StringUtils.isBlank(argument)) {
+      LOG.error("Cannot delete empty note from ZeppelinHub");
+      return;
+    }
+    client.newRequest(zepelinhubUrl + argument)
+        .method(HttpMethod.DELETE)
+        .header(ZEPPELIN_TOKEN_HEADER, token)
+        .send(new BufferingResponseListener() {
+
+          @Override
+          public void onComplete(Result res) {
+            if (!res.isFailed() && res.getResponse().getStatus() == 200) {
+              LOG.info("Successfully removed note from ZeppelinHub with {}",
+                  res.getResponse().getStatus());
+            } else {
+              LOG.warn("Failed to remove note from ZeppelinHub with HttpStatus {}",
+                  res.getResponse().getStatus());
+            }
+          }
+
+          @Override
+          public void onFailure(Response response, Throwable failure) {
+            LOG.error("Failed to remove note from ZeppelinHub: {}", response.getReason(), failure);
+          }
+        });
+  }
+
+  public void close() {
+    try {
+      client.stop();
+    } catch (Exception e) {
+      LOG.info("Couldn't stop ZeppelinHub client properly", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java
new file mode 100644
index 0000000..4b8b42d
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/security/Authentication.java
@@ -0,0 +1,228 @@
+package org.apache.zeppelin.notebook.repo.zeppelinhub.security;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.GeneralSecurityException;
+import java.security.Key;
+import java.util.Collections;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.NameValuePair;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * Authentication module.
+ *
+ */
+public class Authentication implements Runnable {
+  private static final Logger LOG = LoggerFactory.getLogger(Authentication.class);
+  private String principal = "anonymous";
+  private String ticket = "anonymous";
+  private String roles = StringUtils.EMPTY;
+
+  private final HttpClient client;
+  private String loginEndpoint;
+
+  // Cipher is an AES in CBC mode
+  private static final String CIPHER_ALGORITHM = "AES";
+  private static final String CIPHER_MODE = "AES/CBC/PKCS5PADDING";
+  private static final String KEY = "AbtEr99DxsWWbJkP";
+  private static final int ivSize = 16;
+
+  private static final String ZEPPELIN_CONF_ANONYMOUS_ALLOWED = "zeppelin.anonymous.allowed";
+  private static final String ZEPPELINHUB_USER_KEY = "zeppelinhub.user.key";
+  private String token;
+  private boolean authEnabled;
+  private boolean authenticated;
+  String userKey;
+
+  private Gson gson = new Gson();
+  private static Authentication instance = null;
+
+  public static Authentication initialize(String token, ZeppelinConfiguration conf) {
+    if (instance == null && conf != null) {
+      instance = new Authentication(token, conf);
+    }
+    return instance;
+  }
+
+  public static Authentication getInstance() {
+    return instance;
+  }
+
+  private Authentication(String token, ZeppelinConfiguration conf) {
+    MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
+    client = new HttpClient(connectionManager);
+    this.token = token;
+
+    authEnabled = !conf.getBoolean("ZEPPELIN_ALLOW_ANONYMOUS",
+        ZEPPELIN_CONF_ANONYMOUS_ALLOWED, true);
+
+    userKey = conf.getString("ZEPPELINHUB_USER_KEY",
+        ZEPPELINHUB_USER_KEY, "");
+
+    loginEndpoint = getLoginEndpoint(conf);
+  }
+
+  public String getPrincipal() {
+    return this.principal;
+  }
+
+  public String getTicket() {
+    return this.ticket;
+  }
+
+  public String getRoles() {
+    return this.roles;
+  }
+
+  public boolean isAuthenticated() {
+    return authenticated;
+  }
+  private String getLoginEndpoint(ZeppelinConfiguration conf) {
+    int port = conf.getInt("ZEPPELIN_PORT", "zeppelin.server.port" , 8080);
+    if (port <= 0) {
+      port = 8080;
+    }
+    String scheme = "http";
+    if (conf.useSsl()) {
+      scheme = "https";
+    }
+    String endpoint = scheme + "://localhost:" + port + "/api/login";
+    return endpoint;
+  }
+
+  public boolean authenticate() {
+    if (authEnabled) {
+      if (!StringUtils.isEmpty(userKey)) {
+        String authKey = getAuthKey(userKey);
+        Map<String, String> authCredentials = login(authKey, loginEndpoint);
+        if (isEmptyMap(authCredentials)) {
+          return false;
+        }
+        principal = authCredentials.containsKey("principal") ? authCredentials.get("principal")
+            : principal;
+        ticket = authCredentials.containsKey("ticket") ? authCredentials.get("ticket") : ticket;
+        roles = authCredentials.containsKey("roles") ? authCredentials.get("roles") : roles;
+        LOG.info("Authenticated into Zeppelin as {} and roles {}", principal, roles);
+        return true;
+      } else {
+        LOG.warn("ZEPPELINHUB_USER_KEY isn't provided. Please provide your credentials"
+            + "for your instance in ZeppelinHub website and generate your key.");
+      }
+    }
+    return false;
+  }
+
+  // returns login:password
+  private String getAuthKey(String userKey) {
+    LOG.debug("Encrypted user key is {}", userKey);
+    if (StringUtils.isBlank(userKey)) {
+      LOG.warn("ZEPPELINHUB_USER_KEY is blank");
+      return StringUtils.EMPTY;
+    }
+    //use hashed token as a salt
+    String hashedToken = Integer.toString(token.hashCode());
+    return decrypt(userKey, hashedToken); 
+  }
+
+  private String decrypt(String value, String initVector) {
+    LOG.debug("IV is {}, IV length is {}", initVector, initVector.length());
+    if (StringUtils.isBlank(value) || StringUtils.isBlank(initVector)) {
+      LOG.error("String to decode or salt is not provided");
+      return StringUtils.EMPTY;
+    }
+    try {
+      IvParameterSpec iv = generateIV(initVector);
+      Key key = generateKey();
+
+      Cipher cipher = Cipher.getInstance(CIPHER_MODE);
+      cipher.init(Cipher.DECRYPT_MODE, key, iv);
+
+      byte[] decryptedString = Base64.decodeBase64(toBytes(value));
+      decryptedString = cipher.doFinal(decryptedString);
+      return new String(decryptedString);
+    } catch (GeneralSecurityException e) {
+      LOG.error("Error when decrypting", e);
+      return StringUtils.EMPTY;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<String, String> login(String authKey, String endpoint) {
+    String[] credentials = authKey.split(":");
+    if (credentials.length != 2) {
+      return Collections.emptyMap();
+    }
+    PostMethod post = new PostMethod(endpoint);
+    post.addRequestHeader("Origin", "http://localhost");
+    post.addParameter(new NameValuePair("userName", credentials[0]));
+    post.addParameter(new NameValuePair("password", credentials[1]));
+    try {
+      int code = client.executeMethod(post);
+      if (code == HttpStatus.SC_OK) {
+        String content = post.getResponseBodyAsString();
+        Map<String, Object> resp = gson.fromJson(content, 
+            new TypeToken<Map<String, Object>>() {}.getType());
+        LOG.info("Received from Zeppelin LoginRestApi : " + content);
+        return (Map<String, String>) resp.get("body");
+      } else {
+        LOG.error("Failed Zeppelin login {}, status code {}", endpoint, code);
+        return Collections.emptyMap();
+      }
+    } catch (IOException e) {
+      LOG.error("Cannot login into Zeppelin", e);
+      return Collections.emptyMap();
+    }
+  }
+
+  private Key generateKey() {
+    return new SecretKeySpec(toBytes(KEY), CIPHER_ALGORITHM);
+  }
+
+  private byte[] toBytes(String value) {
+    byte[] bytes;
+    try {
+      bytes = value.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      LOG.warn("UTF-8 isn't supported ", e);
+      bytes = value.getBytes();
+    }
+    return bytes;
+  }
+
+  private IvParameterSpec generateIV(String ivString) {
+    byte[] ivFromBytes = toBytes(ivString);
+    byte[] iv16ToBytes = new byte[ivSize];
+    System.arraycopy(ivFromBytes, 0, iv16ToBytes, 0, Math.min(ivFromBytes.length, ivSize));
+    return new IvParameterSpec(iv16ToBytes);
+  }
+
+  private boolean isEmptyMap(Map<String, String> map) {
+    return map == null || map.isEmpty();
+  }
+
+  @Override
+  public void run() {
+    authenticated = authenticate();
+    LOG.info("Scheduled authentication status is {}", authenticated);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java
new file mode 100644
index 0000000..fe2eabf
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/Client.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client to connect Zeppelin and ZeppelinHub via websocket API.
+ * Implemented using singleton pattern.
+ * 
+ */
+public class Client {
+  private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+  private final ZeppelinhubClient zeppelinhubClient;
+  private final ZeppelinClient zeppelinClient;
+  private static Client instance = null;
+
+  public static Client initialize(String zeppelinUri, String zeppelinhubUri, String token, 
+      ZeppelinConfiguration conf) {
+    if (instance == null) {
+      instance = new Client(zeppelinUri, zeppelinhubUri, token, conf);
+    }
+    return instance;
+  }
+
+  public static Client getInstance() {
+    return instance;
+  }
+
+  private Client(String zeppelinUri, String zeppelinhubUri, String token,
+      ZeppelinConfiguration conf) {
+    LOG.debug("Init Client");
+    zeppelinhubClient = ZeppelinhubClient.initialize(zeppelinhubUri, token);
+    zeppelinClient = ZeppelinClient.initialize(zeppelinUri, token, conf);
+  }
+
+  public void start() {
+    if (zeppelinhubClient != null) {
+      zeppelinhubClient.start();
+    }
+    if (zeppelinClient != null) {
+      zeppelinClient.start();
+    }
+  }
+
+  public void stop() {
+    if (zeppelinhubClient != null) {
+      zeppelinhubClient.stop();
+    }
+    if (zeppelinClient != null) {
+      zeppelinClient.stop();
+    }
+  }
+
+  public void relayToZeppelinHub(String message) {
+    zeppelinhubClient.send(message);
+  }
+
+  public void relayToZeppelin(Message message, String noteId) {
+    zeppelinClient.send(message, noteId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
new file mode 100644
index 0000000..8ac6ebe
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHeartbeat;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+
+/**
+ * Zeppelin websocket client.
+ *
+ */
+public class ZeppelinClient {
+  private static final Logger LOG = LoggerFactory.getLogger(ZeppelinClient.class);
+  private final URI zeppelinWebsocketUrl;
+  private final String zeppelinhubToken;
+  private final WebSocketClient wsClient;
+  private static Gson gson;
+  private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
+  private static ZeppelinClient instance = null;
+  private SchedulerService schedulerService;
+  private Authentication authModule;
+  private static final int min = 60;
+
+  public static ZeppelinClient initialize(String zeppelinUrl, String token, 
+      ZeppelinConfiguration conf) {
+    if (instance == null) {
+      instance = new ZeppelinClient(zeppelinUrl, token, conf);
+    }
+    return instance;
+  }
+
+  public static ZeppelinClient getInstance() {
+    return instance;
+  }
+
+  private ZeppelinClient(String zeppelinUrl, String token, ZeppelinConfiguration conf) {
+    zeppelinWebsocketUrl = URI.create(zeppelinUrl);
+    zeppelinhubToken = token;
+    wsClient = createNewWebsocketClient();
+    gson = new Gson();
+    zeppelinConnectionMap = new ConcurrentHashMap<>();
+    schedulerService = SchedulerService.getInstance();
+    authModule = Authentication.initialize(token, conf);
+    if (authModule != null) {
+      SchedulerService.getInstance().addOnce(authModule, 10);
+    }
+    LOG.info("Initialized Zeppelin websocket client on {}", zeppelinWebsocketUrl);
+  }
+
+  private WebSocketClient createNewWebsocketClient() {
+    SslContextFactory sslContextFactory = new SslContextFactory();
+    WebSocketClient client = new WebSocketClient(sslContextFactory);
+    client.setMaxIdleTimeout(5 * min * 1000);
+    //TODO(khalid): other client settings
+    return client;
+  }
+
+  public void start() {
+    try {
+      if (wsClient != null) {
+        wsClient.start();
+        addRoutines();
+      } else {
+        LOG.warn("Cannot start zeppelin websocket client - isn't initialized");
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot start Zeppelin websocket client", e);
+    }
+  }
+
+  private void addRoutines() {
+    schedulerService.add(ZeppelinHeartbeat.newInstance(this), 15, 4 * min);
+  }
+
+  public void stop() {
+    try {
+      if (wsClient != null) {
+        removeAllZeppelinConnections();
+        wsClient.stop();
+      } else {
+        LOG.warn("Cannot stop zeppelin websocket client - isn't initialized");
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot stop Zeppelin websocket client", e);
+    }
+  }
+
+  public String serialize(Message zeppelinMsg) {
+    if (credentialsAvailable()) {
+      zeppelinMsg.principal = authModule.getPrincipal();
+      zeppelinMsg.ticket = authModule.getTicket();
+      zeppelinMsg.roles = authModule.getRoles();
+    }
+    String msg = gson.toJson(zeppelinMsg);
+    return msg;
+  }
+
+  private boolean credentialsAvailable() {
+    return Authentication.getInstance() != null && Authentication.getInstance().isAuthenticated();
+  }
+
+  public Message deserialize(String zeppelinMessage) {
+    if (StringUtils.isBlank(zeppelinMessage)) {
+      return null;
+    }
+    Message msg;
+    try {
+      msg = gson.fromJson(zeppelinMessage, Message.class);
+    } catch (JsonSyntaxException ex) {
+      LOG.error("Cannot deserialize zeppelin message", ex);
+      msg = null;
+    }
+    return msg;
+  }
+
+  public void send(Message msg, String noteId) {
+    Session noteSession = getZeppelinConnection(noteId);
+    if (!isSessionOpen(noteSession)) {
+      LOG.error("Cannot open websocket connection to Zeppelin note {}", noteId);
+      return;
+    }
+    noteSession.getRemote().sendStringByFuture(serialize(msg));
+  }
+
+  private boolean isSessionOpen(Session session) {
+    return (session != null) && (session.isOpen());
+  }
+  
+  /* per notebook based ws connection, returns null if can't connect */
+  public Session getZeppelinConnection(String noteId) {
+    if (StringUtils.isBlank(noteId)) {
+      LOG.warn("Cannot return websocket connection for blank noteId");
+      return null;
+    }
+
+    if (zeppelinConnectionMap.containsKey(noteId)) {
+      LOG.info("Connection for {} exists in map", noteId);
+      return getNoteSession(noteId);
+    }
+    //TODO(khalid): clean log later
+    LOG.info("Creating Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
+    return openNoteSession(noteId);
+  }
+
+  private Message zeppelinGetNoteMsg(String noteId) {
+    Message getNoteMsg = new Message(Message.OP.GET_NOTE);
+    HashMap<String, Object> data = new HashMap<String, Object>();
+    data.put("id", noteId);
+    getNoteMsg.data = data;
+    return getNoteMsg;
+  }
+  
+  private Session getNoteSession(String noteId) {
+    Session session = zeppelinConnectionMap.get(noteId);
+    if (session == null || !session.isOpen()) {
+      LOG.info("Not connection to {}", noteId);
+      zeppelinConnectionMap.remove(noteId);
+      session = openNoteSession(noteId);
+    }
+    return session;
+  }
+  
+  private Session openNoteSession(String noteId) {
+    ClientUpgradeRequest request = new ClientUpgradeRequest();
+    ZeppelinWebsocket socket = new ZeppelinWebsocket(noteId);
+    Future<Session> future = null;
+    Session session = null;
+    try {
+      future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
+      session = future.get();
+    } catch (IOException | InterruptedException | ExecutionException e) {
+      LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
+      return session;
+    }
+
+    if (zeppelinConnectionMap.containsKey(noteId)) {
+      session.close();
+      session = zeppelinConnectionMap.get(noteId);
+    } else {
+      String getNote = serialize(zeppelinGetNoteMsg(noteId));
+      // TODO(khalid): may need to check return whether successful
+      session.getRemote().sendStringByFuture(getNote);
+      zeppelinConnectionMap.put(noteId, session);
+    }
+    return session;
+  }
+
+  public void handleMsgFromZeppelin(String message, String noteId) {
+    Map<String, String> meta = new HashMap<String, String>();
+    meta.put("token", zeppelinhubToken);
+    meta.put("noteId", noteId);
+    Message zeppelinMsg = deserialize(message);
+    if (zeppelinMsg == null) {
+      return;
+    }
+    ZeppelinhubMessage hubMsg = ZeppelinhubMessage.newMessage(zeppelinMsg, meta);
+    Client client = Client.getInstance();
+    if (client == null) {
+      LOG.warn("Client isn't initialized yet");
+      return;
+    }
+    client.relayToZeppelinHub(hubMsg.serialize());
+  }
+
+  /**
+   * Close and remove ZeppelinConnection
+   */
+  public void removeZeppelinConnection(String noteId) {
+    if (zeppelinConnectionMap.containsKey(noteId)) {
+      Session conn = zeppelinConnectionMap.get(noteId);
+      if (conn.isOpen()) {
+        conn.close();
+      }
+      zeppelinConnectionMap.remove(noteId);
+    }
+    // TODO(khalid): clean log later
+    LOG.info("Removed Zeppelin ws connection for the following note {}", noteId);
+  }
+
+  /**
+   * Close and remove all ZeppelinConnection
+   */
+  public void removeAllZeppelinConnections() {
+    for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
+      if (isSessionOpen(entry.getValue())) {
+        entry.getValue().close();
+      }
+      zeppelinConnectionMap.remove(entry.getKey());
+    }
+    LOG.info("Removed all Zeppelin ws connections");
+  }
+
+  public void pingAllNotes() {
+    for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
+      if (isSessionOpen(entry.getValue())) {
+        send(new Message(OP.PING), entry.getKey());
+      } else {
+        // for cleanup
+        zeppelinConnectionMap.remove(entry.getKey());
+      }
+    }
+  }
+
+  public int countConnectedNotes() {
+    return zeppelinConnectionMap.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java
new file mode 100644
index 0000000..9bcd035
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinhubClient.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
+
+
+import java.io.IOException;
+import java.net.HttpCookie;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinhubWebsocket;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinHubOp;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.session.ZeppelinhubSession;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.util.json.JSONArray;
+import com.amazonaws.util.json.JSONException;
+import com.amazonaws.util.json.JSONObject;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * Manage a zeppelinhub websocket connection.
+ */
+public class ZeppelinhubClient {
+  private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubClient.class);
+
+  private final WebSocketClient client;
+  private final URI zeppelinhubWebsocketUrl;
+  private final ClientUpgradeRequest conectionRequest;
+  private final String zeppelinhubToken;
+  
+  private static final int MB = 1048576;
+  private static final int MAXIMUN_TEXT_SIZE = 64 * MB;
+  private static final long CONNECTION_IDLE_TIME = TimeUnit.SECONDS.toMillis(30);
+  private static ZeppelinhubClient instance = null;
+  private static Gson gson;
+  
+  private SchedulerService schedulerService;
+  private ZeppelinhubSession zeppelinhubSession;
+
+  public static ZeppelinhubClient initialize(String zeppelinhubUrl, String token) {
+    if (instance == null) {
+      instance = new ZeppelinhubClient(zeppelinhubUrl, token);
+    }
+    return instance;
+  }
+
+  public static ZeppelinhubClient getInstance() {
+    return instance;
+  }
+
+  private ZeppelinhubClient(String url, String token) {
+    zeppelinhubWebsocketUrl = URI.create(url);
+    client = createNewWebsocketClient();
+    conectionRequest = setConnectionrequest(token);
+    zeppelinhubToken = token;
+    schedulerService = SchedulerService.create(10);
+    gson = new Gson();
+    LOG.info("Initialized ZeppelinHub websocket client on {}", zeppelinhubWebsocketUrl);
+  }
+
+  public void start() {
+    try {
+      client.start();
+      zeppelinhubSession = connect();
+      addRoutines();
+    } catch (Exception e) {
+      LOG.error("Cannot connect to zeppelinhub via websocket", e);
+    }
+  }
+  
+  public void stop() {
+    LOG.info("Stopping Zeppelinhub websocket client");
+    try {
+      zeppelinhubSession.close();
+      schedulerService.close();
+      client.stop();
+    } catch (Exception e) {
+      LOG.error("Cannot stop zeppelinhub websocket client", e);
+    }
+  }
+
+  public String getToken() {
+    return this.zeppelinhubToken;
+  }
+  
+  public void send(String msg) {
+    if (!isConnectedToZeppelinhub()) {
+      LOG.info("Zeppelinhub connection is not open, opening it");
+      zeppelinhubSession = connect();
+      if (zeppelinhubSession == ZeppelinhubSession.EMPTY) {
+        LOG.warn("While connecting to ZeppelinHub received empty session, cannot send the message");
+        return;
+      }
+    }
+    zeppelinhubSession.sendByFuture(msg);
+  }
+  
+  private boolean isConnectedToZeppelinhub() {
+    return (zeppelinhubSession != null && zeppelinhubSession.isSessionOpen());
+  }
+
+  private ZeppelinhubSession connect() {
+    ZeppelinhubSession zeppelinSession;
+    try {
+      ZeppelinhubWebsocket ws = ZeppelinhubWebsocket.newInstance(zeppelinhubToken);
+      Future<Session> future = client.connect(ws, zeppelinhubWebsocketUrl, conectionRequest);
+      Session session = future.get();
+      zeppelinSession = ZeppelinhubSession.createInstance(session, zeppelinhubToken);
+    } catch (IOException | InterruptedException | ExecutionException e) {
+      LOG.info("Couldnt connect to zeppelinhub", e);
+      zeppelinSession = ZeppelinhubSession.EMPTY;
+    }
+    return zeppelinSession;
+  }
+  
+  private ClientUpgradeRequest setConnectionrequest(String token) {
+    ClientUpgradeRequest request = new ClientUpgradeRequest();
+    request.setCookies(Lists.newArrayList(new HttpCookie(ZeppelinHubRepo.TOKEN_HEADER, token)));
+    return request;
+  }
+  
+  private WebSocketClient createNewWebsocketClient() {
+    SslContextFactory sslContextFactory = new SslContextFactory();
+    WebSocketClient client = new WebSocketClient(sslContextFactory);
+    client.setMaxTextMessageBufferSize(MAXIMUN_TEXT_SIZE);
+    client.setMaxIdleTimeout(CONNECTION_IDLE_TIME);
+    return client;
+  }
+  
+  private void addRoutines() {
+    schedulerService.add(ZeppelinHubHeartbeat.newInstance(this), 10, 23);
+  }
+
+  public void handleMsgFromZeppelinHub(String message) {
+    ZeppelinhubMessage hubMsg = ZeppelinhubMessage.deserialize(message);
+    if (hubMsg.equals(ZeppelinhubMessage.EMPTY)) {
+      LOG.error("Cannot handle ZeppelinHub message is empty");
+      return;
+    }
+    String op = StringUtils.EMPTY;
+    if (hubMsg.op instanceof String) {
+      op = (String) hubMsg.op;
+    } else {
+      LOG.error("Message OP from ZeppelinHub isn't string {}", hubMsg.op);
+      return;
+    }
+    if (ZeppelinhubUtils.isZeppelinHubOp(op)) {
+      handleZeppelinHubOpMsg(ZeppelinhubUtils.toZeppelinHubOp(op), hubMsg, message);
+    } else if (ZeppelinhubUtils.isZeppelinOp(op)) {
+      forwardToZeppelin(ZeppelinhubUtils.toZeppelinOp(op), hubMsg);
+    }
+  }
+
+  private void handleZeppelinHubOpMsg(ZeppelinHubOp op, ZeppelinhubMessage hubMsg, String msg) {
+    if (op == null || msg.equals(ZeppelinhubMessage.EMPTY)) {
+      LOG.error("Cannot handle empty op or msg");
+      return;
+    }
+    switch (op) {
+        case RUN_NOTEBOOK:
+          runAllParagraph(hubMsg.meta.get("noteId"), msg);
+          break;
+        default:
+          LOG.warn("Received {} from ZeppelinHub, not handled", op);
+          break;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void forwardToZeppelin(Message.OP op, ZeppelinhubMessage hubMsg) {
+    Message zeppelinMsg = new Message(op);
+    if (!(hubMsg.data instanceof Map)) {
+      LOG.error("Data field of message from ZeppelinHub isn't in correct Map format");
+      return;
+    }
+    zeppelinMsg.data = (Map<String, Object>) hubMsg.data;
+    Client client = Client.getInstance();
+    if (client == null) {
+      LOG.warn("Base client isn't initialized, returning");
+      return;
+    }
+    client.relayToZeppelin(zeppelinMsg, hubMsg.meta.get("noteId"));
+  }
+
+  boolean runAllParagraph(String noteId, String hubMsg) {
+    LOG.info("Running paragraph with noteId {}", noteId);
+    try {
+      JSONObject data = new JSONObject(hubMsg);
+      if (data.equals(JSONObject.NULL) || !(data.get("data") instanceof JSONArray)) {
+        LOG.error("Wrong \"data\" format for RUN_NOTEBOOK");
+        return false;
+      }
+      Client client = Client.getInstance();
+      if (client == null) {
+        LOG.warn("Base client isn't initialized, returning");
+        return false;
+      }
+      Message zeppelinMsg = new Message(OP.RUN_PARAGRAPH);
+
+      JSONArray paragraphs = data.getJSONArray("data");
+      for (int i = 0; i < paragraphs.length(); i++) {
+        if (!(paragraphs.get(i) instanceof JSONObject)) {
+          LOG.warn("Wrong \"paragraph\" format for RUN_NOTEBOOK");
+          continue;
+        }
+        zeppelinMsg.data = gson.fromJson(paragraphs.getString(i), 
+            new TypeToken<Map<String, Object>>(){}.getType());
+        client.relayToZeppelin(zeppelinMsg, noteId);
+        LOG.info("\nSending RUN_PARAGRAPH message to Zeppelin ");
+      }
+    } catch (JSONException e) {
+      LOG.error("Failed to parse RUN_NOTEBOOK message from ZeppelinHub ", e);
+      return false;
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
new file mode 100644
index 0000000..facfcab
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
+
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinClient;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Zeppelin websocket listener class.
+ *
+ */
+public class ZeppelinWebsocket implements WebSocketListener {
+  private static final Logger LOG = LoggerFactory.getLogger(ZeppelinWebsocket.class);
+  public Session connection;
+  public String noteId;
+
+  public ZeppelinWebsocket(String noteId) {
+    this.noteId = noteId;
+  }
+
+  @Override
+  public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
+
+  }
+
+  @Override
+  public void onWebSocketClose(int code, String message) {
+    LOG.info("Zeppelin connection closed with code: {}, message: {}", code, message);
+    // parentClient.removeConnMap(noteId);
+  }
+
+  @Override
+  public void onWebSocketConnect(Session session) {
+    LOG.info("Zeppelin connection opened");
+    this.connection = session;
+  }
+
+  @Override
+  public void onWebSocketError(Throwable e) {
+    LOG.warn("Zeppelin socket connection error ", e);
+  }
+
+  @Override
+  public void onWebSocketText(String data) {
+    LOG.debug("Zeppelin client received Message: " + data);
+    // propagate to ZeppelinHub
+    try {
+      ZeppelinClient zeppelinClient = ZeppelinClient.getInstance();
+      if (zeppelinClient != null) {
+        zeppelinClient.handleMsgFromZeppelin(data, noteId);
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to send message to ZeppelinHub: ", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java
new file mode 100644
index 0000000..e28054a
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinhubWebsocket.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
+
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinhubClient;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Zeppelinhub websocket handler.
+ */
+public class ZeppelinhubWebsocket implements WebSocketListener {
+  private Logger LOG = LoggerFactory.getLogger(ZeppelinhubWebsocket.class);
+  private Session zeppelinHubSession;
+  private final String token;
+  
+  private ZeppelinhubWebsocket(String token) {
+    this.token = token;
+  }
+
+  public static ZeppelinhubWebsocket newInstance(String token) {
+    return new ZeppelinhubWebsocket(token);
+  }
+  
+  @Override
+  public void onWebSocketBinary(byte[] payload, int offset, int len) {}
+
+  @Override
+  public void onWebSocketClose(int statusCode, String reason) {
+    LOG.info("Closing websocket connection [{}] : {}", statusCode, reason);
+    send(ZeppelinhubUtils.deadMessage(token));
+    this.zeppelinHubSession = null;
+  }
+
+  @Override
+  public void onWebSocketConnect(Session session) {
+    LOG.info("Opening a new session to Zeppelinhub {}", session.hashCode());
+    this.zeppelinHubSession = session;
+    send(ZeppelinhubUtils.liveMessage(token));
+  }
+
+  @Override
+  public void onWebSocketError(Throwable cause) {
+    LOG.error("Got error", cause);
+  }
+
+  @Override
+  public void onWebSocketText(String message) {
+    // handle message from ZeppelinHub.
+    ZeppelinhubClient client = ZeppelinhubClient.getInstance();
+    if (client != null) {
+      client.handleMsgFromZeppelinHub(message);
+    }
+  }
+
+  private boolean isSessionOpen() {
+    return ((zeppelinHubSession != null) && (zeppelinHubSession.isOpen())) ? true : false;
+  }
+  
+  private void send(String msg) {
+    if (isSessionOpen()) {
+      zeppelinHubSession.getRemote().sendStringByFuture(msg);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java
new file mode 100644
index 0000000..80d5f06
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinHubOp.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol;
+
+/**
+ * Zeppelinhub Op.
+ */
+public enum ZeppelinHubOp {
+  LIVE,
+  DEAD,
+  PING,
+  PONG,
+  RUN_NOTEBOOK,
+  WELCOME,
+  ZEPPELIN_STATUS
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java
new file mode 100644
index 0000000..aa1ce21
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/protocol/ZeppelinhubMessage.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+
+/**
+ * Zeppelinhub message class.
+ *
+ */
+public class ZeppelinhubMessage {
+  private static final Gson gson = new Gson();
+  private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+  public static final ZeppelinhubMessage EMPTY = new ZeppelinhubMessage();
+
+  public Object op;
+  public Object data;
+  public Map<String, String> meta = Maps.newHashMap();
+  
+  private ZeppelinhubMessage() {
+    this.op = OP.LIST_NOTES;
+    this.data = null;
+  }
+  
+  private ZeppelinhubMessage(Object op, Object data, Map<String, String> meta) {
+    this.op = op;
+    this.data = data;
+    this.meta = meta;
+  }
+  
+  public static ZeppelinhubMessage newMessage(Object op, Object data, Map<String, String> meta) {
+    return new ZeppelinhubMessage(op, data, meta);
+  }
+
+  public static ZeppelinhubMessage newMessage(Message zeppelinMsg, Map<String, String> meta) {
+    if (zeppelinMsg == null) {
+      return EMPTY;
+    }
+    return new ZeppelinhubMessage(zeppelinMsg.op, zeppelinMsg.data, meta);
+  }
+
+  public String serialize() {
+    return gson.toJson(this, ZeppelinhubMessage.class);
+  }
+
+  public static ZeppelinhubMessage deserialize(String zeppelinhubMessage) {
+    if (StringUtils.isBlank(zeppelinhubMessage)) {
+      return EMPTY;
+    }
+    ZeppelinhubMessage msg;
+    try {
+      msg = gson.fromJson(zeppelinhubMessage, ZeppelinhubMessage.class);
+    } catch (JsonSyntaxException ex) {
+      LOG.error("Cannot deserialize zeppelinhub message", ex);
+      msg = EMPTY;
+    }
+    return msg;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8cde5c9b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java
new file mode 100644
index 0000000..024a3c0
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/SchedulerService.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Creates a thread pool that can schedule zeppelinhub commands.
+ *
+ */
+public class SchedulerService {
+
+  private final ScheduledExecutorService pool;
+  private static SchedulerService instance = null;
+
+  private SchedulerService(int numberOfThread) {
+    pool = Executors.newScheduledThreadPool(numberOfThread);
+  }
+
+  public static SchedulerService create(int numberOfThread) {
+    if (instance == null) {
+      instance = new SchedulerService(numberOfThread);
+    }
+    return instance;
+  }
+
+  public static SchedulerService getInstance() {
+    if (instance == null) {
+      instance = new SchedulerService(2);
+    }
+    return instance;
+  }
+
+  public void add(Runnable service, int firstExecution, int period) {
+    pool.scheduleAtFixedRate(service, firstExecution, period, TimeUnit.SECONDS);
+  }
+
+  public void addOnce(Runnable service, int firstExecution) {
+    pool.schedule(service, firstExecution, TimeUnit.SECONDS);
+  }
+
+  public void close() {
+    pool.shutdown();
+  }
+
+}