You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mi...@apache.org on 2016/09/26 07:50:59 UTC
zeppelin git commit: Zeppelin 1307 - Implement notebook revision in
Zeppelinhub repo
Repository: zeppelin
Updated Branches:
refs/heads/branch-0.6 a22a0355d -> 37f78c78b
Zeppelin 1307 - Implement notebook revision in Zeppelinhub repo
Implement versioning in ZeppelinHub notebook storage.
Improvement
* [x] - Implement Versioning API
* [ZEPPELIN-1307](https://issues.apache.org/jira/browse/ZEPPELIN-1307)
Edit `zeppelin-env.sh` and add `org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo` in `ZEPPELIN_NOTEBOOK_STORAGE`.
* Does the licenses files need update? NO
* Is there breaking changes for older versions? NO
* Does this needs documentation? NO
Author: Anthony Corbacho <co...@gmail.com>
Closes #1338 from anthonycorbacho/ZEPPELIN-1307 and squashes the following commits:
dd57e7f [Anthony Corbacho] Fix NPE
aef5cf3 [Anthony Corbacho] cleanup code
6cd9251 [Anthony Corbacho] revert change to try ressource stmnt
3b919a9 [Anthony Corbacho] Rework log trace
74a0cdb [Anthony Corbacho] change asyncPutWithResponseBody to accpet url instead of noteId
2395a6e [Anthony Corbacho] Light refactor of ZeppelinHubRestapiHandler and extract api call to a single method
5d4b54b [Anthony Corbacho] Implement checkpoint method
3942a78 [Anthony Corbacho] Implement get revision
9bd0946 [Anthony Corbacho] Close InputStream in asyncGet
(cherry picked from commit 7f733ffb2ef5bfe30418028f696d267046dba833)
Signed-off-by: Mina Lee <mi...@apache.org>
Conflicts:
zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/37f78c78
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/37f78c78
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/37f78c78
Branch: refs/heads/branch-0.6
Commit: 37f78c78b9b0e76642d78ec943d0a1eade8b6823
Parents: a22a035
Author: Anthony Corbacho <co...@gmail.com>
Authored: Tue Aug 23 11:40:09 2016 +0900
Committer: Mina Lee <mi...@apache.org>
Committed: Mon Sep 26 16:50:45 2016 +0900
----------------------------------------------------------------------
.../repo/zeppelinhub/ZeppelinHubRepo.java | 41 +++++--
.../rest/ZeppelinhubRestApiHandler.java | 118 ++++++++-----------
2 files changed, 81 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/37f78c78/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
index 45bf0a1..960bcde 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
@@ -33,6 +33,8 @@ import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
@@ -191,20 +193,45 @@ public class ZeppelinHubRepo implements NotebookRepo {
@Override
public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject)
throws IOException {
- // Auto-generated method stub
- return null;
+ if (StringUtils.isBlank(noteId)) {
+ return null;
+ }
+ String endpoint = Joiner.on("/").join(noteId, "checkpoint");
+ String content = GSON.toJson(ImmutableMap.of("message", checkpointMsg));
+ String response = restApiClient.asyncPutWithResponseBody(endpoint, content);
+
+ return GSON.fromJson(response, Revision.class);
}
@Override
- public Note get(String noteId, Revision rev, AuthenticationInfo subject) throws IOException {
- // Auto-generated method stub
- return null;
+ public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException {
+ if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId)) {
+ return EMPTY_NOTE;
+ }
+ String endpoint = Joiner.on("/").join(noteId, "checkpoint", revId);
+ String response = restApiClient.asyncGet(endpoint);
+ Note note = GSON.fromJson(response, Note.class);
+ if (note == null) {
+ return EMPTY_NOTE;
+ }
+ LOG.info("ZeppelinHub REST API get note {} revision {}", noteId, revId);
+ return note;
}
@Override
public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject) {
- // Auto-generated method stub
- return null;
+ if (StringUtils.isBlank(noteId)) {
+ return Collections.emptyList();
+ }
+ String endpoint = Joiner.on("/").join(noteId, "checkpoint");
+ List<Revision> history = Collections.emptyList();
+ try {
+ String response = restApiClient.asyncGet(endpoint);
+ history = GSON.fromJson(response, new TypeToken<List<Revision>>(){}.getType());
+ } catch (IOException e) {
+ LOG.error("Cannot get note history", e);
+ }
+ return history;
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/37f78c78/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
index 8f9b2e5..82159fc 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
@@ -25,9 +25,8 @@ import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
-import org.eclipse.jetty.client.api.Result;
-import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpMethod;
@@ -115,89 +114,66 @@ public class ZeppelinhubRestApiHandler {
}
public String asyncGet(String argument) throws IOException {
- String note = StringUtils.EMPTY;
+ return sendToZeppelinHub(HttpMethod.GET, zepelinhubUrl + argument);
+ }
+
+ public String asyncPutWithResponseBody(String url, String json) throws IOException {
+ if (StringUtils.isBlank(url) || StringUtils.isBlank(json)) {
+ LOG.error("Empty note, cannot send it to zeppelinHub");
+ throw new IOException("Cannot send emtpy note to zeppelinHub");
+ }
+ return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json);
+ }
+
+ public void asyncPut(String jsonNote) throws IOException {
+ if (StringUtils.isBlank(jsonNote)) {
+ LOG.error("Cannot save empty note/string to ZeppelinHub");
+ return;
+ }
+ sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote);
+ }
+ public void asyncDel(String argument) throws IOException {
+ if (StringUtils.isBlank(argument)) {
+ LOG.error("Cannot delete empty note from ZeppelinHub");
+ return;
+ }
+ sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument);
+ }
+
+ private String sendToZeppelinHub(HttpMethod method, String url) throws IOException {
+ return sendToZeppelinHub(method, url, StringUtils.EMPTY);
+ }
+
+ private String sendToZeppelinHub(HttpMethod method, String url, String json) throws IOException {
InputStreamResponseListener listener = new InputStreamResponseListener();
- client.newRequest(zepelinhubUrl + argument)
- .header(ZEPPELIN_TOKEN_HEADER, token)
- .send(listener);
-
- // Wait for the response headers to arrive
Response response;
+ String data;
+
+ Request request = client.newRequest(url).method(method).header(ZEPPELIN_TOKEN_HEADER, token);
+ if ((method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST)) &&
+ !StringUtils.isBlank(json)) {
+ request.content(new StringContentProvider(json, "UTF-8"), "application/json;charset=UTF-8");
+ }
+ request.send(listener);
+
try {
response = listener.get(30, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
- LOG.error("Cannot perform Get request to ZeppelinHub", e);
- throw new IOException("Cannot load note from ZeppelinHub", e);
+ LOG.error("Cannot perform {} request to ZeppelinHub", method, e);
+ throw new IOException("Cannot perform " + method + " request to ZeppelinHub", e);
}
int code = response.getStatus();
if (code == 200) {
try (InputStream responseContent = listener.getInputStream()) {
- note = IOUtils.toString(responseContent, "UTF-8");
+ data = IOUtils.toString(responseContent, "UTF-8");
}
} else {
- LOG.error("ZeppelinHub Get {} returned with status {} ", zepelinhubUrl + argument, code);
- throw new IOException("Cannot load note from ZeppelinHub");
- }
- return note;
- }
-
- public void asyncPut(String jsonNote) throws IOException {
- if (StringUtils.isBlank(jsonNote)) {
- LOG.error("Cannot save empty note/string to ZeppelinHub");
- return;
- }
-
- client.newRequest(zepelinhubUrl).method(HttpMethod.PUT)
- .header(ZEPPELIN_TOKEN_HEADER, token)
- .content(new StringContentProvider(jsonNote, "UTF-8"), "application/json;charset=UTF-8")
- .send(new BufferingResponseListener() {
-
- @Override
- public void onComplete(Result res) {
- if (!res.isFailed() && res.getResponse().getStatus() == 200) {
- LOG.info("Successfully saved note to ZeppelinHub with {}",
- res.getResponse().getStatus());
- } else {
- LOG.warn("Failed to save note to ZeppelinHub with HttpStatus {}",
- res.getResponse().getStatus());
- }
- }
-
- @Override
- public void onFailure(Response response, Throwable failure) {
- LOG.error("Failed to save note to ZeppelinHub: {}", response.getReason(), failure);
- }
- });
- }
-
- public void asyncDel(String argument) {
- if (StringUtils.isBlank(argument)) {
- LOG.error("Cannot delete empty note from ZeppelinHub");
- return;
+ LOG.error("ZeppelinHub {} {} returned with status {} ", method, url, code);
+ throw new IOException("Cannot perform " + method + " request to ZeppelinHub");
}
- client.newRequest(zepelinhubUrl + argument)
- .method(HttpMethod.DELETE)
- .header(ZEPPELIN_TOKEN_HEADER, token)
- .send(new BufferingResponseListener() {
-
- @Override
- public void onComplete(Result res) {
- if (!res.isFailed() && res.getResponse().getStatus() == 200) {
- LOG.info("Successfully removed note from ZeppelinHub with {}",
- res.getResponse().getStatus());
- } else {
- LOG.warn("Failed to remove note from ZeppelinHub with HttpStatus {}",
- res.getResponse().getStatus());
- }
- }
-
- @Override
- public void onFailure(Response response, Throwable failure) {
- LOG.error("Failed to remove note from ZeppelinHub: {}", response.getReason(), failure);
- }
- });
+ return data;
}
public void close() {