You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by an...@apache.org on 2016/11/24 11:56:27 UTC
zeppelin git commit: [ZEPPELIN-1690] - ZeppelinHubNotebookRepo multy
user handling
Repository: zeppelin
Updated Branches:
refs/heads/master 3389e8cfc -> 33e2dab37
[ZEPPELIN-1690] - ZeppelinHubNotebookRepo multy user handling
### What is this PR for?
This PR bring multi user handling to ZeppelinHubNotebookRepo.
### What type of PR is it?
[Improvement ]
### What is the Jira issue?
* [ZEPPELIN-1690](https://issues.apache.org/jira/browse/ZEPPELIN-1690)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Anthony Corbacho <co...@gmail.com>
Closes #1635 from anthonycorbacho/feat/ZeppelinHubRepoMultiUser and squashes the following commits:
d9e0036 [Anthony Corbacho] Move code location
d8989aa [Anthony Corbacho] Handle invalid subject
aef6e00 [Anthony Corbacho] Fix check style
edb9e8c [Anthony Corbacho] Desactivate ws :: we will need to refactor this part
e884203 [Anthony Corbacho] Refactor :: remove 'async' on every http call
dbb8ebd [Anthony Corbacho] Fix test
25f6215 [Anthony Corbacho] pass user token to zeppelinhub rest api handler
674fb93 [Anthony Corbacho] Refactor ZeppelinHub rest API handler - Now takes a token on every http request
3fbfcfa [Anthony Corbacho] Add new login on how user can get his token at runtime
a8aeb51 [Anthony Corbacho] add comment in zeppelinhubRealm about saving user session in a singleton map
5931ab6 [Anthony Corbacho] Fix check style
67051a0 [Anthony Corbacho] Add ZeppelinHub instance model
e3e5a15 [Anthony Corbacho] Add userTiket in AuthenticationInfo on OnMessage method in notebookServer
7a0c959 [Anthony Corbacho] Add zeppelinhub user session to userSession container after login throght zeppelinhubRealm
0729f51 [Anthony Corbacho] Add zeppelinhub session container
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/33e2dab3
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/33e2dab3
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/33e2dab3
Branch: refs/heads/master
Commit: 33e2dab37ef94aecc262b1b8f75e668cd0db0e60
Parents: 3389e8c
Author: Anthony Corbacho <co...@gmail.com>
Authored: Wed Nov 23 14:43:59 2016 +0900
Committer: Anthony Corbacho <co...@gmail.com>
Committed: Thu Nov 24 20:56:17 2016 +0900
----------------------------------------------------------------------
.../apache/zeppelin/realm/ZeppelinHubRealm.java | 11 +-
.../apache/zeppelin/socket/NotebookServer.java | 3 +-
.../repo/zeppelinhub/ZeppelinHubRepo.java | 111 +++++++++++++++----
.../repo/zeppelinhub/model/Instance.java | 27 +++++
.../zeppelinhub/model/UserSessionContainer.java | 54 +++++++++
.../rest/ZeppelinhubRestApiHandler.java | 71 +++++++++---
.../repo/zeppelinhub/ZeppelinHubRepoTest.java | 18 +--
7 files changed, 249 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java b/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
index 67ed544..c37a0a8 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java
@@ -37,6 +37,7 @@ import org.apache.shiro.authc.UsernamePasswordToken;
import org.apache.shiro.authz.AuthorizationInfo;
import org.apache.shiro.realm.AuthorizingRealm;
import org.apache.shiro.subject.PrincipalCollection;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
import org.apache.zeppelin.server.ZeppelinServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
private static final String USER_LOGIN_API_ENDPOINT = "api/v1/users/login";
private static final String JSON_CONTENT_TYPE = "application/json";
private static final String UTF_8_ENCODING = "UTF-8";
+ private static final String USER_SESSION_HEADER = "X-session";
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();
private final HttpClient httpClient;
@@ -126,6 +128,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
protected User authenticateUser(String requestBody) {
PutMethod put = new PutMethod(Joiner.on("/").join(zeppelinhubUrl, USER_LOGIN_API_ENDPOINT));
String responseBody = StringUtils.EMPTY;
+ String userSession = StringUtils.EMPTY;
try {
put.setRequestEntity(new StringRequestEntity(requestBody, JSON_CONTENT_TYPE, UTF_8_ENCODING));
int statusCode = httpClient.executeMethod(put);
@@ -136,6 +139,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
+ "Login or password incorrect");
}
responseBody = put.getResponseBodyAsString();
+ userSession = put.getResponseHeader(USER_SESSION_HEADER).getValue();
put.releaseConnection();
} catch (IOException e) {
@@ -150,13 +154,16 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
LOG.error("Cannot deserialize ZeppelinHub response to User instance", e);
throw new AuthenticationException("Cannot login to ZeppelinHub");
}
-
+
+ // Add ZeppelinHub user_session token this singleton map, this will help ZeppelinHubRepo
+ // to get specific information about the current user.
+ UserSessionContainer.instance.setSession(account.login, userSession);
+
/* TODO(khalid): add proper roles and add listener */
HashSet<String> userAndRoles = new HashSet<String>();
userAndRoles.add(account.login);
ZeppelinServer.notebookWsServer.broadcastReloadedNoteList(
new org.apache.zeppelin.user.AuthenticationInfo(account.login), userAndRoles);
-
return account;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index ebc5755..c5a39b3 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -190,7 +190,8 @@ public class NotebookServer extends WebSocketServlet implements
if (StringUtils.isEmpty(conn.getUser())) {
addUserConnection(messagereceived.principal, conn);
}
- AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal);
+ AuthenticationInfo subject =
+ new AuthenticationInfo(messagereceived.principal, messagereceived.ticket);
/** Lets be elegant here */
switch (messagereceived.op) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
index a4b6202..e941c0d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
@@ -22,6 +22,8 @@ import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -29,8 +31,9 @@ import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepoSettingsInfo;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
-import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,20 +54,27 @@ public class ZeppelinHubRepo implements NotebookRepo {
public static final String TOKEN_HEADER = "X-Zeppelin-Token";
private static final Gson GSON = new Gson();
private static final Note EMPTY_NOTE = new Note();
- private final Client websocketClient;
+ //private final Client websocketClient;
private String token;
private ZeppelinhubRestApiHandler restApiClient;
+
+ private final ZeppelinConfiguration conf;
+ // In order to avoid too many call to ZeppelinHub backend, we save a map of user -> session.
+ private ConcurrentMap<String, String> usersToken = new ConcurrentHashMap<String, String>();
+
public ZeppelinHubRepo(ZeppelinConfiguration conf) {
+ this.conf = conf;
String zeppelinHubUrl = getZeppelinHubUrl(conf);
LOG.info("Initializing ZeppelinHub integration module");
token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
restApiClient = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
- websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
- getZeppelinhubWebsocketUri(conf), token, conf);
- websocketClient.start();
+ // TODO(xxx): refactor this in the next itaration
+ //websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
+ // getZeppelinhubWebsocketUri(conf), token, conf);
+ //websocketClient.start();
}
private String getZeppelinHubWsUri(URI api) throws URISyntaxException {
@@ -144,10 +154,56 @@ public class ZeppelinHubRepo implements NotebookRepo {
}
return zeppelinhubUrl;
}
+
+ /**
+ * Get Token directly from Zeppelinhub.
+ * This will avoid and remove the needs of setting up token in zeppelin-env.sh.
+ */
+ private String getUserZeppelinInstanceToken(String ticket) throws IOException {
+ if (StringUtils.isBlank(ticket)) {
+ return "";
+ }
+
+ List<Instance> instances = restApiClient.getInstances(ticket);
+ // TODO(anthony): Implement NotebookRepo Setting to let user switch token at runtime.
+
+ token = instances.isEmpty() ? StringUtils.EMPTY : instances.get(0).token;
+ return token;
+ }
+
+ /**
+ * For a given user logged in is zeppelin (via zeppelinhub notebook repo), get default token.
+ * */
+ private String getUserToken(String principal) {
+ String token = usersToken.get(principal);
+ if (StringUtils.isBlank(token)) {
+ String ticket = UserSessionContainer.instance.getSession(principal);
+ try {
+ token = getUserZeppelinInstanceToken(ticket);
+ usersToken.putIfAbsent(principal, token);
+ } catch (IOException e) {
+ LOG.error("Cannot get user token", e);
+ token = StringUtils.EMPTY;
+ }
+ }
+
+ return token;
+ }
+ private boolean isSubjectValid(AuthenticationInfo subject) {
+ if (subject == null) {
+ return false;
+ }
+ return (subject.isAnonymous() && !conf.isAnonymousAllowed()) ? false : true;
+ }
+
@Override
public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
- String response = restApiClient.asyncGet("");
+ if (!isSubjectValid(subject)) {
+ return Collections.emptyList();
+ }
+ String token = getUserToken(subject.getUser());
+ String response = restApiClient.get(token, StringUtils.EMPTY);
List<NoteInfo> notes = GSON.fromJson(response, new TypeToken<List<NoteInfo>>() {}.getType());
if (notes == null) {
return Collections.emptyList();
@@ -158,11 +214,11 @@ public class ZeppelinHubRepo implements NotebookRepo {
@Override
public Note get(String noteId, AuthenticationInfo subject) throws IOException {
- if (StringUtils.isBlank(noteId)) {
+ if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return EMPTY_NOTE;
}
- //String response = zeppelinhubHandler.get(noteId);
- String response = restApiClient.asyncGet(noteId);
+ String token = getUserToken(subject.getUser());
+ String response = restApiClient.get(token, noteId);
Note note = GSON.fromJson(response, Note.class);
if (note == null) {
return EMPTY_NOTE;
@@ -173,45 +229,55 @@ public class ZeppelinHubRepo implements NotebookRepo {
@Override
public void save(Note note, AuthenticationInfo subject) throws IOException {
- if (note == null) {
- throw new IOException("Zeppelinhub failed to save empty note");
+ if (note == null || !isSubjectValid(subject)) {
+ throw new IOException("Zeppelinhub failed to save note");
}
- String notebook = GSON.toJson(note);
- restApiClient.asyncPut(notebook);
- LOG.info("ZeppelinHub REST API saving note {} ", note.getId());
+ String jsonNote = GSON.toJson(note);
+ String token = getUserToken(subject.getUser());
+ LOG.info("ZeppelinHub REST API saving note {} ", note.getId());
+ restApiClient.put(token, jsonNote);
}
@Override
public void remove(String noteId, AuthenticationInfo subject) throws IOException {
- restApiClient.asyncDel(noteId);
+ if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
+ throw new IOException("Zeppelinhub failed to remove note");
+ }
+ String token = getUserToken(subject.getUser());
LOG.info("ZeppelinHub REST API removing note {} ", noteId);
+ restApiClient.del(token, noteId);
}
@Override
public void close() {
- websocketClient.stop();
+ //websocketClient.stop();
}
@Override
public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject)
throws IOException {
- if (StringUtils.isBlank(noteId)) {
+ if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return null;
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint");
String content = GSON.toJson(ImmutableMap.of("message", checkpointMsg));
- String response = restApiClient.asyncPutWithResponseBody(endpoint, content);
+ String token = getUserToken(subject.getUser());
+ String response = restApiClient.putWithResponseBody(token, endpoint, content);
+
return GSON.fromJson(response, Revision.class);
}
@Override
public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException {
- if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId)) {
+ if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId) || !isSubjectValid(subject)) {
return EMPTY_NOTE;
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint", revId);
- String response = restApiClient.asyncGet(endpoint);
+
+ String token = getUserToken(subject.getUser());
+ String response = restApiClient.get(token, endpoint);
+
Note note = GSON.fromJson(response, Note.class);
if (note == null) {
return EMPTY_NOTE;
@@ -222,13 +288,14 @@ public class ZeppelinHubRepo implements NotebookRepo {
@Override
public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject) {
- if (StringUtils.isBlank(noteId)) {
+ if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return Collections.emptyList();
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint");
List<Revision> history = Collections.emptyList();
try {
- String response = restApiClient.asyncGet(endpoint);
+ String token = getUserToken(subject.getUser());
+ String response = restApiClient.get(token, endpoint);
history = GSON.fromJson(response, new TypeToken<List<Revision>>(){}.getType());
} catch (IOException e) {
LOG.error("Cannot get note history", e);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java
new file mode 100644
index 0000000..2767962
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/Instance.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.model;
+
+/**
+ * ZeppelinHub Instance structure.
+ *
+ */
+public class Instance {
+ public int id;
+ public String name;
+ public String token;
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java
new file mode 100644
index 0000000..c741e77
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/model/UserSessionContainer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.model;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Simple and yet dummy container for zeppelinhub session.
+ *
+ */
+public class UserSessionContainer {
+ private static class Entity {
+ public final String userSession;
+
+ Entity(String userSession) {
+ this.userSession = userSession;
+ }
+ }
+
+ private Map<String, Entity> sessions = new ConcurrentHashMap<>();
+
+ public static final UserSessionContainer instance = new UserSessionContainer();
+
+ public synchronized String getSession(String principal) {
+ Entity entry = sessions.get(principal);
+ if (entry == null) {
+ return StringUtils.EMPTY;
+ }
+ return entry.userSession;
+ }
+
+ public synchronized String setSession(String principal, String userSession) {
+ Entity entry = new Entity(userSession);
+ sessions.put(principal, entry);
+ return entry.userSession;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
index 63699e6..a913d85 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
@@ -18,12 +18,16 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@@ -35,6 +39,9 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
/**
* REST API handler.
*
@@ -42,6 +49,7 @@ import org.slf4j.LoggerFactory;
public class ZeppelinhubRestApiHandler {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token";
+ private static final String USER_SESSION_HEADER = "X-User-Session";
private static final String DEFAULT_API_PATH = "/api/v1/zeppelin";
private static boolean PROXY_ON = false;
private static String PROXY_HOST;
@@ -49,16 +57,13 @@ public class ZeppelinhubRestApiHandler {
private final HttpClient client;
private final String zepelinhubUrl;
- private final String token;
- public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl,
- String token) {
+ public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl, String token) {
return new ZeppelinhubRestApiHandler(zeppelinhubUrl, token);
}
private ZeppelinhubRestApiHandler(String zeppelinhubUrl, String token) {
this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
- this.token = token;
//TODO(khalid):to make proxy conf consistent with Zeppelin confs
//readProxyConf();
@@ -114,35 +119,75 @@ public class ZeppelinhubRestApiHandler {
return httpClient;
}
- public String asyncGet(String argument) throws IOException {
- return sendToZeppelinHub(HttpMethod.GET, zepelinhubUrl + argument, StringUtils.EMPTY, true);
+ /**
+ * Fetch zeppelin instances for a given user.
+ * @param ticket
+ * @return
+ * @throws IOException
+ */
+ public List<Instance> getInstances(String ticket) throws IOException {
+ InputStreamResponseListener listener = new InputStreamResponseListener();
+ Response response;
+ String url = zepelinhubUrl + "instances";
+ String data;
+
+ Request request = client.newRequest(url).header(USER_SESSION_HEADER, ticket);
+ request.send(listener);
+
+ try {
+ response = listener.get(30, TimeUnit.SECONDS);
+ } catch (InterruptedException | TimeoutException | ExecutionException e) {
+ LOG.error("Cannot perform request to ZeppelinHub", e);
+ throw new IOException("Cannot perform GET request to ZeppelinHub", e);
+ }
+
+ int code = response.getStatus();
+ if (code == 200) {
+ try (InputStream responseContent = listener.getInputStream()) {
+ data = IOUtils.toString(responseContent, "UTF-8");
+ }
+ } else {
+ LOG.error("ZeppelinHub GET {} returned with status {} ", url, code);
+ throw new IOException("Cannot perform GET request to ZeppelinHub");
+ }
+ Type listType = new TypeToken<ArrayList<Instance>>() {}.getType();
+ return new Gson().fromJson(data, listType);
+ }
+
+ public String get(String token, String argument) throws IOException {
+ String url = zepelinhubUrl + argument;
+ return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
}
- public String asyncPutWithResponseBody(String url, String json) throws IOException {
+ public String putWithResponseBody(String token, String url, String json) throws IOException {
if (StringUtils.isBlank(url) || StringUtils.isBlank(json)) {
LOG.error("Empty note, cannot send it to zeppelinHub");
throw new IOException("Cannot send emtpy note to zeppelinHub");
}
- return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, true);
+ return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
}
- public void asyncPut(String jsonNote) throws IOException {
+ public void put(String token, String jsonNote) throws IOException {
if (StringUtils.isBlank(jsonNote)) {
LOG.error("Cannot save empty note/string to ZeppelinHub");
return;
}
- sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, false);
+ sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
}
- public void asyncDel(String argument) throws IOException {
+ public void del(String token, String argument) throws IOException {
if (StringUtils.isBlank(argument)) {
LOG.error("Cannot delete empty note from ZeppelinHub");
return;
}
- sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, false);
+ sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token, false);
}
- private String sendToZeppelinHub(HttpMethod method, String url, String json, boolean withResponse)
+ private String sendToZeppelinHub(HttpMethod method,
+ String url,
+ String json,
+ String token,
+ boolean withResponse)
throws IOException {
Request request = client.newRequest(url).method(method).header(ZEPPELIN_TOKEN_HEADER, token);
if ((method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST))
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/33e2dab3/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
index 720dd70..1a954e7 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepoTest.java
@@ -13,6 +13,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
+import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
@@ -20,8 +21,9 @@ import com.google.common.io.Files;
public class ZeppelinHubRepoTest {
- final String TOKEN = "AAA-BBB-CCC-00";
+ final String token = "AAA-BBB-CCC-00";
final String testAddr = "http://zeppelinhub.ltd";
+ final AuthenticationInfo auth = new AuthenticationInfo("anthony");
private ZeppelinHubRepo repo;
private File pathOfNotebooks = new File(System.getProperty("user.dir") + "/src/test/resources/list_of_notes");
@@ -30,7 +32,7 @@ public class ZeppelinHubRepoTest {
@Before
public void setUp() throws Exception {
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, testAddr);
- System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_TOKEN, "AAA-BBB-CCC-00");
+ System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_TOKEN, token);
ZeppelinConfiguration conf = new ZeppelinConfiguration();
repo = new ZeppelinHubRepo(conf);
@@ -41,10 +43,10 @@ public class ZeppelinHubRepoTest {
ZeppelinhubRestApiHandler mockedZeppelinhubHandler = mock(ZeppelinhubRestApiHandler.class);
byte[] response = Files.toByteArray(pathOfNotebooks);
- when(mockedZeppelinhubHandler.asyncGet("")).thenReturn(new String(response));
+ when(mockedZeppelinhubHandler.get("", "")).thenReturn(new String(response));
response = Files.toByteArray(pathOfNotebook);
- when(mockedZeppelinhubHandler.asyncGet("AAAAA")).thenReturn(new String(response));
+ when(mockedZeppelinhubHandler.get("", "AAAAA")).thenReturn(new String(response));
return mockedZeppelinhubHandler;
}
@@ -123,14 +125,14 @@ public class ZeppelinHubRepoTest {
@Test
public void testGetAllNotes() throws IOException {
- List<NoteInfo> notebooks = repo.list(null);
+ List<NoteInfo> notebooks = repo.list(auth);
assertThat(notebooks).isNotEmpty();
assertThat(notebooks.size()).isEqualTo(3);
}
@Test
public void testGetNote() throws IOException {
- Note notebook = repo.get("AAAAA", null);
+ Note notebook = repo.get("AAAAA", auth);
assertThat(notebook).isNotNull();
assertThat(notebook.getId()).isEqualTo("2A94M5J1Z");
}
@@ -138,13 +140,13 @@ public class ZeppelinHubRepoTest {
@Test
public void testRemoveNote() throws IOException {
// not suppose to throw
- repo.remove("AAAAA", null);
+ repo.remove("AAAAA", auth);
}
@Test
public void testRemoveNoteError() throws IOException {
// not suppose to throw
- repo.remove("BBBBB", null);
+ repo.remove("BBBBB", auth);
}
}