You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by an...@apache.org on 2016/11/24 11:56:27 UTC

zeppelin git commit: [ZEPPELIN-1690] - ZeppelinHubNotebookRepo multy user handling

Repository: zeppelin
Updated Branches:
  refs/heads/master 3389e8cfc -> 33e2dab37


[ZEPPELIN-1690] - ZeppelinHubNotebookRepo multy user handling

### What is this PR for?
 This PR bring multi user handling to ZeppelinHubNotebookRepo.

### What type of PR is it?
[Improvement ]

### What is the Jira issue?
 * [ZEPPELIN-1690](https://issues.apache.org/jira/browse/ZEPPELIN-1690)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Anthony Corbacho <co...@gmail.com>

Closes #1635 from anthonycorbacho/feat/ZeppelinHubRepoMultiUser and squashes the following commits:

d9e0036 [Anthony Corbacho] Move code location
d8989aa [Anthony Corbacho] Handle invalid subject
aef6e00 [Anthony Corbacho] Fix check style
edb9e8c [Anthony Corbacho] Desactivate ws :: we will need to refactor this part
e884203 [Anthony Corbacho] Refactor :: remove 'async' on every http call
dbb8ebd [Anthony Corbacho] Fix test
25f6215 [Anthony Corbacho] pass user token to zeppelinhub rest api handler
674fb93 [Anthony Corbacho] Refactor ZeppelinHub rest API handler  - Now takes a token on every http request
3fbfcfa [Anthony Corbacho] Add new login on how user can get his token at runtime
a8aeb51 [Anthony Corbacho] add comment in zeppelinhubRealm about saving user session in a singleton map
5931ab6 [Anthony Corbacho] Fix check style
67051a0 [Anthony Corbacho] Add ZeppelinHub instance model
e3e5a15 [Anthony Corbacho] Add userTiket in AuthenticationInfo on OnMessage method in notebookServer
7a0c959 [Anthony Corbacho] Add zeppelinhub user session to userSession container after login throght zeppelinhubRealm
0729f51 [Anthony Corbacho] Add zeppelinhub session container


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/33e2dab3
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/33e2dab3
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/33e2dab3

Branch: refs/heads/master
Commit: 33e2dab37ef94aecc262b1b8f75e668cd0db0e60
Parents: 3389e8c
Author: Anthony Corbacho <co...@gmail.com>
Authored: Wed Nov 23 14:43:59 2016 +0900
Committer: Anthony Corbacho <co...@gmail.com>
Committed: Thu Nov 24 20:56:17 2016 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/realm/ZeppelinHubRealm.java |  11 +-
 .../apache/zeppelin/socket/NotebookServer.java  |   3 +-
 .../repo/zeppelinhub/ZeppelinHubRepo.java       | 111 +++++++++++++++----
 .../repo/zeppelinhub/model/Instance.java        |  27 +++++
 .../zeppelinhub/model/UserSessionContainer.java |  54 +++++++++
 .../rest/ZeppelinhubRestApiHandler.java         |  71 +++++++++---
 .../repo/zeppelinhub/ZeppelinHubRepoTest.java   |  18 +--
 7 files changed, 249 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java b/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
index 67ed544..c37a0a8 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
@@ -37,6 +37,7 @@ import org.apache.shiro.authc.UsernamePasswordToken;
 import org.apache.shiro.authz.AuthorizationInfo;
 import org.apache.shiro.realm.AuthorizingRealm;
 import org.apache.shiro.subject.PrincipalCollection;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
 import org.apache.zeppelin.server.ZeppelinServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
   private static final String USER_LOGIN_API_ENDPOINT = "api/v1/users/login";
   private static final String JSON_CONTENT_TYPE = "application/json";
   private static final String UTF_8_ENCODING = "UTF-8";
+  private static final String USER_SESSION_HEADER = "X-session";
   private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();
 
   private final HttpClient httpClient;
@@ -126,6 +128,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
   protected User authenticateUser(String requestBody) {
     PutMethod put = new PutMethod(Joiner.on("/").join(zeppelinhubUrl, USER_LOGIN_API_ENDPOINT));
     String responseBody = StringUtils.EMPTY;
+    String userSession = StringUtils.EMPTY;
     try {
       put.setRequestEntity(new StringRequestEntity(requestBody, JSON_CONTENT_TYPE, UTF_8_ENCODING));
       int statusCode = httpClient.executeMethod(put);
@@ -136,6 +139,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
             + "Login or password incorrect");
       }
       responseBody = put.getResponseBodyAsString();
+      userSession = put.getResponseHeader(USER_SESSION_HEADER).getValue();
       put.releaseConnection();
       
     } catch (IOException e) {
@@ -150,13 +154,16 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
       LOG.error("Cannot deserialize ZeppelinHub response to User instance", e);
       throw new AuthenticationException("Cannot login to ZeppelinHub");
     }
-    
+
+    // Add ZeppelinHub user_session token this singleton map, this will help ZeppelinHubRepo
+    // to get specific information about the current user.
+    UserSessionContainer.instance.setSession(account.login, userSession);
+
     /* TODO(khalid): add proper roles and add listener */
     HashSet<String> userAndRoles = new HashSet<String>();
     userAndRoles.add(account.login);
     ZeppelinServer.notebookWsServer.broadcastReloadedNoteList(
         new org.apache.zeppelin.user.AuthenticationInfo(account.login), userAndRoles);
-
     return account;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index ebc5755..c5a39b3 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -190,7 +190,8 @@ public class NotebookServer extends WebSocketServlet implements
       if (StringUtils.isEmpty(conn.getUser())) {
         addUserConnection(messagereceived.principal, conn);
       }
-      AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal);
+      AuthenticationInfo subject = 
+          new AuthenticationInfo(messagereceived.principal, messagereceived.ticket);
 
       /** Lets be elegant here */
       switch (messagereceived.op) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
index a4b6202..e941c0d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
@@ -22,6 +22,8 @@ import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -29,8 +31,9 @@ import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.NoteInfo;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepoSettingsInfo;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
-import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,20 +54,27 @@ public class ZeppelinHubRepo implements NotebookRepo {
   public static final String TOKEN_HEADER = "X-Zeppelin-Token";
   private static final Gson GSON = new Gson();
   private static final Note EMPTY_NOTE = new Note();
-  private final Client websocketClient;
+  //private final Client websocketClient;
 
   private String token;
   private ZeppelinhubRestApiHandler restApiClient;
+  
+  private final ZeppelinConfiguration conf;
 
+  // In order to avoid too many call to ZeppelinHub backend, we save a map of user -> session.
+  private ConcurrentMap<String, String> usersToken = new ConcurrentHashMap<String, String>();
+  
   public ZeppelinHubRepo(ZeppelinConfiguration conf) {
+    this.conf = conf;
     String zeppelinHubUrl = getZeppelinHubUrl(conf);
     LOG.info("Initializing ZeppelinHub integration module");
     token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
     restApiClient = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
 
-    websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
-        getZeppelinhubWebsocketUri(conf), token, conf);
-    websocketClient.start();
+    // TODO(xxx): refactor this in the next itaration
+    //websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
+    //    getZeppelinhubWebsocketUri(conf), token, conf);
+    //websocketClient.start();
   }
 
   private String getZeppelinHubWsUri(URI api) throws URISyntaxException {
@@ -144,10 +154,56 @@ public class ZeppelinHubRepo implements NotebookRepo {
     }
     return zeppelinhubUrl;
   }
+  
+  /**
+   * Get Token directly from Zeppelinhub.
+   * This will avoid and remove the needs of setting up token in zeppelin-env.sh.
+   */
+  private String getUserZeppelinInstanceToken(String ticket) throws IOException {
+    if (StringUtils.isBlank(ticket)) {
+      return "";
+    }
+
+    List<Instance> instances = restApiClient.getInstances(ticket);
+    // TODO(anthony): Implement NotebookRepo Setting to let user switch token at runtime.
+
+    token = instances.isEmpty() ? StringUtils.EMPTY : instances.get(0).token;
+    return token;
+  }
+
+  /**
+   * For a given user logged in is zeppelin (via zeppelinhub notebook repo), get default token.
+   *  */
+  private String getUserToken(String principal) {
+    String token = usersToken.get(principal);
+    if (StringUtils.isBlank(token)) {
+      String ticket = UserSessionContainer.instance.getSession(principal);
+      try {
+        token = getUserZeppelinInstanceToken(ticket);
+        usersToken.putIfAbsent(principal, token);
+      } catch (IOException e) {
+        LOG.error("Cannot get user token", e);
+        token = StringUtils.EMPTY;
+      }
+    }
+
+    return token;
+  }
 
+  private boolean isSubjectValid(AuthenticationInfo subject) {
+    if (subject == null) {
+      return false;
+    }
+    return (subject.isAnonymous() && !conf.isAnonymousAllowed()) ? false : true;
+  }
+  
   @Override
   public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
-    String response = restApiClient.asyncGet("");
+    if (!isSubjectValid(subject)) {
+      return Collections.emptyList();
+    }
+    String token = getUserToken(subject.getUser());
+    String response = restApiClient.get(token, StringUtils.EMPTY);
     List<NoteInfo> notes = GSON.fromJson(response, new TypeToken<List<NoteInfo>>() {}.getType());
     if (notes == null) {
       return Collections.emptyList();
@@ -158,11 +214,11 @@ public class ZeppelinHubRepo implements NotebookRepo {
 
   @Override
   public Note get(String noteId, AuthenticationInfo subject) throws IOException {
-    if (StringUtils.isBlank(noteId)) {
+    if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
       return EMPTY_NOTE;
     }
-    //String response = zeppelinhubHandler.get(noteId);
-    String response = restApiClient.asyncGet(noteId);
+    String token = getUserToken(subject.getUser());
+    String response = restApiClient.get(token, noteId);
     Note note = GSON.fromJson(response, Note.class);
     if (note == null) {
       return EMPTY_NOTE;
@@ -173,45 +229,55 @@ public class ZeppelinHubRepo implements NotebookRepo {
 
   @Override
   public void save(Note note, AuthenticationInfo subject) throws IOException {
-    if (note == null) {
-      throw new IOException("Zeppelinhub failed to save empty note");
+    if (note == null || !isSubjectValid(subject)) {
+      throw new IOException("Zeppelinhub failed to save note");
     }
-    String notebook = GSON.toJson(note);
-    restApiClient.asyncPut(notebook);
-    LOG.info("ZeppelinHub REST API saving note {} ", note.getId()); 
+    String jsonNote = GSON.toJson(note);
+    String token = getUserToken(subject.getUser());
+    LOG.info("ZeppelinHub REST API saving note {} ", note.getId());
+    restApiClient.put(token, jsonNote);
   }
 
   @Override
   public void remove(String noteId, AuthenticationInfo subject) throws IOException {
-    restApiClient.asyncDel(noteId);
+    if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
+      throw new IOException("Zeppelinhub failed to remove note");
+    }
+    String token = getUserToken(subject.getUser());
     LOG.info("ZeppelinHub REST API removing note {} ", noteId);
+    restApiClient.del(token, noteId);
   }
 
   @Override
   public void close() {
-    websocketClient.stop();
+    //websocketClient.stop();
   }
 
   @Override
   public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject)
       throws IOException {
-    if (StringUtils.isBlank(noteId)) {
+    if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
       return null;
     }
     String endpoint = Joiner.on("/").join(noteId, "checkpoint");
     String content = GSON.toJson(ImmutableMap.of("message", checkpointMsg));
-    String response = restApiClient.asyncPutWithResponseBody(endpoint, content);
     
+    String token = getUserToken(subject.getUser());
+    String response = restApiClient.putWithResponseBody(token, endpoint, content);
+
     return GSON.fromJson(response, Revision.class);
   }
 
   @Override
   public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException {
-    if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId)) {
+    if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId) || !isSubjectValid(subject)) {
       return EMPTY_NOTE;
     }
     String endpoint = Joiner.on("/").join(noteId, "checkpoint", revId);
-    String response = restApiClient.asyncGet(endpoint);
+    
+    String token = getUserToken(subject.getUser());
+    String response = restApiClient.get(token, endpoint);
+
     Note note = GSON.fromJson(response, Note.class);
     if (note == null) {
       return EMPTY_NOTE;
@@ -222,13 +288,14 @@ public class ZeppelinHubRepo implements NotebookRepo {
 
   @Override
   public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject) {
-    if (StringUtils.isBlank(noteId)) {
+    if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
       return Collections.emptyList();
     }
     String endpoint = Joiner.on("/").join(noteId, "checkpoint");
     List<Revision> history = Collections.emptyList();
     try {
-      String response = restApiClient.asyncGet(endpoint);
+      String token = getUserToken(subject.getUser());
+      String response = restApiClient.get(token, endpoint);
       history = GSON.fromJson(response, new TypeToken<List<Revision>>(){}.getType());
     } catch (IOException e) {
       LOG.error("Cannot get note history", e);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java
new file mode 100644
index 0000000..2767962
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.model;
+
+/**
+ * ZeppelinHub Instance structure.
+ *
+ */
+public class Instance {
+  public int id;
+  public String name;
+  public String token;
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java
new file mode 100644
index 0000000..c741e77
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.model;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Simple and yet dummy container for zeppelinhub session.
+ * 
+ */
+public class UserSessionContainer {
+  private static class Entity {
+    public final String userSession;
+
+    Entity(String userSession) {
+      this.userSession = userSession;
+    }
+  }
+
+  private Map<String, Entity> sessions = new ConcurrentHashMap<>();
+
+  public static final UserSessionContainer instance = new UserSessionContainer();
+
+  public synchronized String getSession(String principal) {
+    Entity entry = sessions.get(principal);
+    if (entry == null) {
+      return StringUtils.EMPTY;
+    }
+    return entry.userSession;
+  }
+  
+  public synchronized String setSession(String principal, String userSession) {
+    Entity entry = new Entity(userSession);
+    sessions.put(principal, entry);
+    return entry.userSession;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
index 63699e6..a913d85 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
@@ -18,12 +18,16 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.api.Response;
@@ -35,6 +39,9 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
 /**
  * REST API handler.
  *
@@ -42,6 +49,7 @@ import org.slf4j.LoggerFactory;
 public class ZeppelinhubRestApiHandler {
   private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
   public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token";
+  private static final String USER_SESSION_HEADER = "X-User-Session";
   private static final String DEFAULT_API_PATH = "/api/v1/zeppelin";
   private static boolean PROXY_ON = false;
   private static String PROXY_HOST;
@@ -49,16 +57,13 @@ public class ZeppelinhubRestApiHandler {
 
   private final HttpClient client;
   private final String zepelinhubUrl;
-  private final String token;
 
-  public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl,
-      String token) {
+  public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl, String token) {
     return new ZeppelinhubRestApiHandler(zeppelinhubUrl, token);
   }
 
   private ZeppelinhubRestApiHandler(String zeppelinhubUrl, String token) {
     this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
-    this.token = token;
 
     //TODO(khalid):to make proxy conf consistent with Zeppelin confs
     //readProxyConf();
@@ -114,35 +119,75 @@ public class ZeppelinhubRestApiHandler {
     return httpClient;
   }
 
-  public String asyncGet(String argument) throws IOException {
-    return sendToZeppelinHub(HttpMethod.GET, zepelinhubUrl + argument, StringUtils.EMPTY, true);
+  /**
+   * Fetch zeppelin instances for a given user.
+   * @param ticket
+   * @return
+   * @throws IOException
+   */
+  public List<Instance> getInstances(String ticket) throws IOException {
+    InputStreamResponseListener listener = new InputStreamResponseListener();
+    Response response;
+    String url = zepelinhubUrl + "instances";
+    String data;
+
+    Request request = client.newRequest(url).header(USER_SESSION_HEADER, ticket);
+    request.send(listener);
+
+    try {
+      response = listener.get(30, TimeUnit.SECONDS);
+    } catch (InterruptedException | TimeoutException | ExecutionException e) {
+      LOG.error("Cannot perform request to ZeppelinHub", e);
+      throw new IOException("Cannot perform  GET request to ZeppelinHub", e);
+    }
+
+    int code = response.getStatus();
+    if (code == 200) {
+      try (InputStream responseContent = listener.getInputStream()) {
+        data = IOUtils.toString(responseContent, "UTF-8");
+      }
+    } else {
+      LOG.error("ZeppelinHub GET {} returned with status {} ", url, code);
+      throw new IOException("Cannot perform  GET request to ZeppelinHub");
+    }
+    Type listType = new TypeToken<ArrayList<Instance>>() {}.getType();
+    return new Gson().fromJson(data, listType);
+  }
+
+  public String get(String token, String argument) throws IOException {
+    String url = zepelinhubUrl + argument;
+    return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
   }
   
-  public String asyncPutWithResponseBody(String url, String json) throws IOException {
+  public String putWithResponseBody(String token, String url, String json) throws IOException {
     if (StringUtils.isBlank(url) || StringUtils.isBlank(json)) {
       LOG.error("Empty note, cannot send it to zeppelinHub");
       throw new IOException("Cannot send emtpy note to zeppelinHub");
     }
-    return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, true);
+    return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
   }
   
-  public void asyncPut(String jsonNote) throws IOException {
+  public void put(String token, String jsonNote) throws IOException {
     if (StringUtils.isBlank(jsonNote)) {
       LOG.error("Cannot save empty note/string to ZeppelinHub");
       return;
     }
-    sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, false);
+    sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
   }
 
-  public void asyncDel(String argument) throws IOException {
+  public void del(String token, String argument) throws IOException {
     if (StringUtils.isBlank(argument)) {
       LOG.error("Cannot delete empty note from ZeppelinHub");
       return;
     }
-    sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, false);
+    sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token, false);
   }
   
-  private String sendToZeppelinHub(HttpMethod method, String url, String json, boolean withResponse)
+  private String sendToZeppelinHub(HttpMethod method,
+                                   String url,
+                                   String json,
+                                   String token,
+                                   boolean withResponse)
       throws IOException {
     Request request = client.newRequest(url).method(method).header(ZEPPELIN_TOKEN_HEADER, token);
     if ((method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST))

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
index 720dd70..1a954e7 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
@@ -13,6 +13,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.NoteInfo;
 import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
+import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -20,8 +21,9 @@ import com.google.common.io.Files;
 
 
 public class ZeppelinHubRepoTest {
-  final String TOKEN = "AAA-BBB-CCC-00";
+  final String token = "AAA-BBB-CCC-00";
   final String testAddr = "http://zeppelinhub.ltd";
+  final AuthenticationInfo auth = new AuthenticationInfo("anthony");
 
   private ZeppelinHubRepo repo;
   private File pathOfNotebooks = new File(System.getProperty("user.dir") + "/src/test/resources/list_of_notes");
@@ -30,7 +32,7 @@ public class ZeppelinHubRepoTest {
   @Before
   public void setUp() throws Exception {
     System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, testAddr);
-    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_TOKEN, "AAA-BBB-CCC-00");
+    System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_TOKEN, token);
 
     ZeppelinConfiguration conf = new ZeppelinConfiguration();
     repo = new ZeppelinHubRepo(conf);
@@ -41,10 +43,10 @@ public class ZeppelinHubRepoTest {
     ZeppelinhubRestApiHandler mockedZeppelinhubHandler = mock(ZeppelinhubRestApiHandler.class);
 
     byte[] response = Files.toByteArray(pathOfNotebooks);
-    when(mockedZeppelinhubHandler.asyncGet("")).thenReturn(new String(response));
+    when(mockedZeppelinhubHandler.get("", "")).thenReturn(new String(response));
 
     response =  Files.toByteArray(pathOfNotebook);
-    when(mockedZeppelinhubHandler.asyncGet("AAAAA")).thenReturn(new String(response));
+    when(mockedZeppelinhubHandler.get("", "AAAAA")).thenReturn(new String(response));
 
     return mockedZeppelinhubHandler;
   }
@@ -123,14 +125,14 @@ public class ZeppelinHubRepoTest {
 
   @Test
   public void testGetAllNotes() throws IOException {
-    List<NoteInfo> notebooks = repo.list(null);
+    List<NoteInfo> notebooks = repo.list(auth);
     assertThat(notebooks).isNotEmpty();
     assertThat(notebooks.size()).isEqualTo(3);
   }
   
   @Test
   public void testGetNote() throws IOException {
-    Note notebook = repo.get("AAAAA", null);
+    Note notebook = repo.get("AAAAA", auth);
     assertThat(notebook).isNotNull();
     assertThat(notebook.getId()).isEqualTo("2A94M5J1Z");
   }
@@ -138,13 +140,13 @@ public class ZeppelinHubRepoTest {
   @Test
   public void testRemoveNote() throws IOException {
     // not suppose to throw
-    repo.remove("AAAAA", null);
+    repo.remove("AAAAA", auth);
   }
   
   @Test
   public void testRemoveNoteError() throws IOException {
     // not suppose to throw
-    repo.remove("BBBBB", null);
+    repo.remove("BBBBB", auth);
   }
 
 }