You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mi...@apache.org on 2016/09/26 07:50:59 UTC

zeppelin git commit: Zeppelin 1307 - Implement notebook revision in Zeppelinhub repo

Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.6 a22a0355d -> 37f78c78b


Zeppelin 1307 - Implement notebook revision in Zeppelinhub repo

Implement versioning in ZeppelinHub notebook storage.

Improvement

* [x] - Implement Versioning API

 * [ZEPPELIN-1307](https://issues.apache.org/jira/browse/ZEPPELIN-1307)

Edit `zeppelin-env.sh` and add `org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo` in `ZEPPELIN_NOTEBOOK_STORAGE`.

* Does the licenses files need update? NO
* Is there breaking changes for older versions? NO
* Does this needs documentation? NO

Author: Anthony Corbacho <co...@gmail.com>

Closes #1338 from anthonycorbacho/ZEPPELIN-1307 and squashes the following commits:

dd57e7f [Anthony Corbacho] Fix NPE
aef5cf3 [Anthony Corbacho] cleanup code
6cd9251 [Anthony Corbacho] revert change to try ressource stmnt
3b919a9 [Anthony Corbacho] Rework log trace
74a0cdb [Anthony Corbacho] change asyncPutWithResponseBody to accpet url instead of noteId
2395a6e [Anthony Corbacho] Light refactor of ZeppelinHubRestapiHandler and extract api call to a single method
5d4b54b [Anthony Corbacho] Implement checkpoint method
3942a78 [Anthony Corbacho] Implement get revision
9bd0946 [Anthony Corbacho] Close InputStream in asyncGet

(cherry picked from commit 7f733ffb2ef5bfe30418028f696d267046dba833)
Signed-off-by: Mina Lee <mi...@apache.org>

Conflicts:
	zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/37f78c78
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/37f78c78
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/37f78c78

Branch: refs/heads/branch-0.6
Commit: 37f78c78b9b0e76642d78ec943d0a1eade8b6823
Parents: a22a035
Author: Anthony Corbacho <co...@gmail.com>
Authored: Tue Aug 23 11:40:09 2016 +0900
Committer: Mina Lee <mi...@apache.org>
Committed: Mon Sep 26 16:50:45 2016 +0900

----------------------------------------------------------------------
 .../repo/zeppelinhub/ZeppelinHubRepo.java       |  41 +++++--
 .../rest/ZeppelinhubRestApiHandler.java         | 118 ++++++++-----------
 2 files changed, 81 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/37f78c78/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
index 45bf0a1..960bcde 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java
@@ -33,6 +33,8 @@ import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 
@@ -191,20 +193,45 @@ public class ZeppelinHubRepo implements NotebookRepo {
   @Override
   public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject)
       throws IOException {
-    // Auto-generated method stub
-    return null;
+    if (StringUtils.isBlank(noteId)) {
+      return null;
+    }
+    String endpoint = Joiner.on("/").join(noteId, "checkpoint");
+    String content = GSON.toJson(ImmutableMap.of("message", checkpointMsg));
+    String response = restApiClient.asyncPutWithResponseBody(endpoint, content);
+    
+    return GSON.fromJson(response, Revision.class);
   }
 
   @Override
-  public Note get(String noteId, Revision rev, AuthenticationInfo subject) throws IOException {
-    // Auto-generated method stub
-    return null;
+  public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException {
+    if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId)) {
+      return EMPTY_NOTE;
+    }
+    String endpoint = Joiner.on("/").join(noteId, "checkpoint", revId);
+    String response = restApiClient.asyncGet(endpoint);
+    Note note = GSON.fromJson(response, Note.class);
+    if (note == null) {
+      return EMPTY_NOTE;
+    }
+    LOG.info("ZeppelinHub REST API get note {} revision {}", noteId, revId);
+    return note;
   }
 
   @Override
   public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject) {
-    // Auto-generated method stub
-    return null;
+    if (StringUtils.isBlank(noteId)) {
+      return Collections.emptyList();
+    }
+    String endpoint = Joiner.on("/").join(noteId, "checkpoint");
+    List<Revision> history = Collections.emptyList();
+    try {
+      String response = restApiClient.asyncGet(endpoint);
+      history = GSON.fromJson(response, new TypeToken<List<Revision>>(){}.getType());
+    } catch (IOException e) {
+      LOG.error("Cannot get note history", e);
+    }
+    return history;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/37f78c78/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
index 8f9b2e5..82159fc 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java
@@ -25,9 +25,8 @@ import java.util.concurrent.TimeoutException;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.api.Response;
-import org.eclipse.jetty.client.api.Result;
-import org.eclipse.jetty.client.util.BufferingResponseListener;
 import org.eclipse.jetty.client.util.InputStreamResponseListener;
 import org.eclipse.jetty.client.util.StringContentProvider;
 import org.eclipse.jetty.http.HttpMethod;
@@ -115,89 +114,66 @@ public class ZeppelinhubRestApiHandler {
   }
 
   public String asyncGet(String argument) throws IOException {
-    String note = StringUtils.EMPTY;
+    return sendToZeppelinHub(HttpMethod.GET, zepelinhubUrl + argument);
+  }
+  
+  public String asyncPutWithResponseBody(String url, String json) throws IOException {
+    if (StringUtils.isBlank(url) || StringUtils.isBlank(json)) {
+      LOG.error("Empty note, cannot send it to zeppelinHub");
+      throw new IOException("Cannot send emtpy note to zeppelinHub");
+    }
+    return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json);
+  }
+  
+  public void asyncPut(String jsonNote) throws IOException {
+    if (StringUtils.isBlank(jsonNote)) {
+      LOG.error("Cannot save empty note/string to ZeppelinHub");
+      return;
+    }
+    sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote);
+  }
 
+  public void asyncDel(String argument) throws IOException {
+    if (StringUtils.isBlank(argument)) {
+      LOG.error("Cannot delete empty note from ZeppelinHub");
+      return;
+    }
+    sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument);
+  }
+  
+  private String sendToZeppelinHub(HttpMethod method, String url) throws IOException {
+    return sendToZeppelinHub(method, url, StringUtils.EMPTY);
+  }
+  
+  private String sendToZeppelinHub(HttpMethod method, String url, String json) throws IOException {
     InputStreamResponseListener listener = new InputStreamResponseListener();
-    client.newRequest(zepelinhubUrl + argument)
-          .header(ZEPPELIN_TOKEN_HEADER, token)
-          .send(listener);
-
-    // Wait for the response headers to arrive
     Response response;
+    String data;
+
+    Request request = client.newRequest(url).method(method).header(ZEPPELIN_TOKEN_HEADER, token);
+    if ((method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST)) &&
+        !StringUtils.isBlank(json)) {
+      request.content(new StringContentProvider(json, "UTF-8"), "application/json;charset=UTF-8");
+    }
+    request.send(listener);
+
     try {
       response = listener.get(30, TimeUnit.SECONDS);
     } catch (InterruptedException | TimeoutException | ExecutionException e) {
-      LOG.error("Cannot perform Get request to ZeppelinHub", e);
-      throw new IOException("Cannot load note from ZeppelinHub", e);
+      LOG.error("Cannot perform {} request to ZeppelinHub", method, e);
+      throw new IOException("Cannot perform " + method + " request to ZeppelinHub", e);
     }
 
     int code = response.getStatus();
     if (code == 200) {
       try (InputStream responseContent = listener.getInputStream()) {
-        note = IOUtils.toString(responseContent, "UTF-8");
+        data = IOUtils.toString(responseContent, "UTF-8");
       }
     } else {
-      LOG.error("ZeppelinHub Get {} returned with status {} ", zepelinhubUrl + argument, code);
-      throw new IOException("Cannot load note from ZeppelinHub");
-    }
-    return note;
-  }
-
-  public void asyncPut(String jsonNote) throws IOException {
-    if (StringUtils.isBlank(jsonNote)) {
-      LOG.error("Cannot save empty note/string to ZeppelinHub");
-      return;
-    }
-
-    client.newRequest(zepelinhubUrl).method(HttpMethod.PUT)
-        .header(ZEPPELIN_TOKEN_HEADER, token)
-        .content(new StringContentProvider(jsonNote, "UTF-8"), "application/json;charset=UTF-8")
-        .send(new BufferingResponseListener() {
-
-          @Override
-          public void onComplete(Result res) {
-            if (!res.isFailed() && res.getResponse().getStatus() == 200) {
-              LOG.info("Successfully saved note to ZeppelinHub with {}",
-                  res.getResponse().getStatus());
-            } else {
-              LOG.warn("Failed to save note to ZeppelinHub with HttpStatus {}",
-                  res.getResponse().getStatus());
-            }
-          }
-
-          @Override
-          public void onFailure(Response response, Throwable failure) {
-            LOG.error("Failed to save note to ZeppelinHub: {}", response.getReason(), failure);
-          }
-        });
-  }
-
-  public void asyncDel(String argument) {
-    if (StringUtils.isBlank(argument)) {
-      LOG.error("Cannot delete empty note from ZeppelinHub");
-      return;
+      LOG.error("ZeppelinHub {} {} returned with status {} ", method, url, code);
+      throw new IOException("Cannot perform " + method + " request to ZeppelinHub");
     }
-    client.newRequest(zepelinhubUrl + argument)
-        .method(HttpMethod.DELETE)
-        .header(ZEPPELIN_TOKEN_HEADER, token)
-        .send(new BufferingResponseListener() {
-
-          @Override
-          public void onComplete(Result res) {
-            if (!res.isFailed() && res.getResponse().getStatus() == 200) {
-              LOG.info("Successfully removed note from ZeppelinHub with {}",
-                  res.getResponse().getStatus());
-            } else {
-              LOG.warn("Failed to remove note from ZeppelinHub with HttpStatus {}",
-                  res.getResponse().getStatus());
-            }
-          }
-
-          @Override
-          public void onFailure(Response response, Throwable failure) {
-            LOG.error("Failed to remove note from ZeppelinHub: {}", response.getReason(), failure);
-          }
-        });
+    return data;
   }
 
   public void close() {