You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2022/04/25 03:55:24 UTC

[zeppelin] branch master updated: [ZEPPELIN-5655] OSSNotebookRepo support version control. (#4308)

This is an automated email from the ASF dual-hosted git repository.

jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 239f8bf8b4 [ZEPPELIN-5655] OSSNotebookRepo support version control. (#4308)
239f8bf8b4 is described below

commit 239f8bf8b409c6d94d9e38643e4051d685af6b5b
Author: Guanhua Li <gu...@foxmail.com>
AuthorDate: Mon Apr 25 11:55:15 2022 +0800

    [ZEPPELIN-5655] OSSNotebookRepo support version control. (#4308)
    
    * OSSNotebookRepo support version control.
    
    * format
    
    * OSSNotebokRepo remove dependenc of ossclient
    
    * Some optimization
    
    * add test
    
    * ADD MOCK service to mock OSSOperator in localFileSystem
    
    * fix bug of rootFolder path in OSSNotebookRepo
    
    * Update for reviews.
    
    * Insert the license header into MockStorageOperator.java and RemoteStorageOperator.java.
    
    * update doc
    
    * configuration to disable version control in OSSNoteBookRepo
    
    * update for reviews
    
    * Update zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java
    
    To key a naming convention.
    
    Co-authored-by: Jongyoul Lee <jo...@gmail.com>
    
    Co-authored-by: Jongyoul Lee <jo...@gmail.com>
---
 docs/setup/storage/storage.md                      |  10 +
 .../zeppelin/conf/ZeppelinConfiguration.java       |   5 +
 .../zeppelin/notebook/repo/OSSNotebookRepo.java    | 290 +++++++++++++--------
 .../zeppelin/notebook/repo/RevisionsInfo.java      |  39 +++
 .../notebook/repo/storage/OSSOperator.java         | 153 +++++++++++
 .../repo/storage/RemoteStorageOperator.java        |  48 ++++
 .../notebook/repo/MockStorageOperator.java         | 122 +++++++++
 .../notebook/repo/OSSNotebookRepoTest.java         | 243 +++++++++++++++++
 8 files changed, 799 insertions(+), 111 deletions(-)

diff --git a/docs/setup/storage/storage.md b/docs/setup/storage/storage.md
index dc85cba219..bbc9583728 100644
--- a/docs/setup/storage/storage.md
+++ b/docs/setup/storage/storage.md
@@ -454,6 +454,16 @@ And you should configure oss related properties in file **zeppelin-site.xml**.
   <description>Access key secret for your OSS account</description>
 </property>
 
+<property>
+  <name>zeppelin.notebook.oss.version.max</name>
+  <value>30</value>
+  <description>
+    Max num of note versions in OSSNoteBookRepo. 
+    It's not mandatory, the default value is 30. 
+    A value of 0 means that version control in OSSNoteBookRepo is disabled.
+  </description>
+</property>
+
 ```
 
 Uncomment the next property for use OSSNotebookRepo class:
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 8021e20ad5..96a4b10de8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -500,6 +500,10 @@ public class ZeppelinConfiguration {
     return getString(ConfVars.ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYSECRET);
   }
 
+  public int getOSSNoteMaxVersionNum(){
+    return getInt(ConfVars.ZEPPELIN_NOTEBOOK_OSS_VERSION_MAX);
+  }
+
   public String getMongoUri() {
     return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_URI);
   }
@@ -973,6 +977,7 @@ public class ZeppelinConfiguration {
     ZEPPELIN_NOTEBOOK_OSS_ENDPOINT("zeppelin.notebook.oss.endpoint", "http://oss-cn-hangzhou.aliyuncs.com"),
     ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYID("zeppelin.notebook.oss.accesskeyid", null),
     ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYSECRET("zeppelin.notebook.oss.accesskeysecret", null),
+    ZEPPELIN_NOTEBOOK_OSS_VERSION_MAX("zeppelin.notebook.oss.version.max", 30),
     ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING("zeppelin.notebook.azure.connectionString", null),
     ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
     ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"),
diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java
index 81096dbc6f..5fddf9904b 100644
--- a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java
+++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java
@@ -17,43 +17,33 @@
 
 package org.apache.zeppelin.notebook.repo;
 
-import com.aliyun.oss.OSS;
-import com.aliyun.oss.OSSClientBuilder;
-import com.aliyun.oss.model.CopyObjectRequest;
-import com.aliyun.oss.model.DeleteObjectsRequest;
-import com.aliyun.oss.model.ListObjectsRequest;
-import com.aliyun.oss.model.OSSObject;
-import com.aliyun.oss.model.OSSObjectSummary;
-import com.aliyun.oss.model.ObjectListing;
-import com.aliyun.oss.model.PutObjectRequest;
-import org.apache.commons.io.IOUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.notebook.repo.storage.OSSOperator;
+import org.apache.zeppelin.notebook.repo.storage.RemoteStorageOperator;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 
 /**
  * NotebookRepo for Aliyun OSS (https://cn.aliyun.com/product/oss)
  */
-public class OSSNotebookRepo implements NotebookRepo {
+public class OSSNotebookRepo implements NotebookRepoWithVersionControl {
   private static final Logger LOGGER = LoggerFactory.getLogger(OSSNotebookRepo.class);
 
-  private OSS ossClient;
   private String bucketName;
   private String rootFolder;
+  private int maxVersionNumber;
+
+  // Use ossOperator instead of ossClient directly
+  private RemoteStorageOperator ossOperator;
 
   public OSSNotebookRepo() {
   }
@@ -63,144 +53,132 @@ public class OSSNotebookRepo implements NotebookRepo {
     String endpoint = conf.getOSSEndpoint();
     bucketName = conf.getOSSBucketName();
     rootFolder = conf.getNotebookDir();
-    // rootFolder is part of OSS key which doesn't start with '/'
-    if (rootFolder.startsWith("/")) {
-      rootFolder = rootFolder.substring(1);
-    }
+    maxVersionNumber = conf.getOSSNoteMaxVersionNum();
+    // rootFolder is part of OSS key
+    rootFolder = formatPath(rootFolder);
     String accessKeyId = conf.getOSSAccessKeyId();
     String accessKeySecret = conf.getOSSAccessKeySecret();
-    this.ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
+    this.ossOperator = new OSSOperator(endpoint, accessKeyId, accessKeySecret);
+  }
+
+  private static String formatPath(String path) {
+    // The path should not start with '/' or './' or './/'
+    // because it is not accepted by OSS service.
+    if (path.startsWith("/")) {
+      path = path.substring(1);
+    }
+    path = new File(path).getPath();
+    if (path.startsWith("./")) {
+      path = path.substring(2);
+    }
+    return path;
+  }
+
+  public void setOssOperator(RemoteStorageOperator ossOperator) {
+    this.ossOperator = ossOperator;
   }
 
   @Override
   public Map<String, NoteInfo> list(AuthenticationInfo subject) throws IOException {
     Map<String, NoteInfo> notesInfo = new HashMap<>();
-    final int maxKeys = 200;
-    String nextMarker = null;
-    ObjectListing objectListing = null;
-    do {
-      objectListing = ossClient.listObjects(
-              new ListObjectsRequest(bucketName)
-                      .withPrefix(rootFolder + "/")
-                      .withMarker(nextMarker)
-                      .withMaxKeys(maxKeys));
-      List<OSSObjectSummary> sums = objectListing.getObjectSummaries();
-      for (OSSObjectSummary s : sums) {
-        if (s.getKey().endsWith(".zpln")) {
-          try {
-            String noteId = getNoteId(s.getKey());
-            String notePath = getNotePath(rootFolder, s.getKey());
-            notesInfo.put(noteId, new NoteInfo(noteId, notePath));
-          } catch (IOException e) {
-            LOGGER.warn(e.getMessage());
-          }
-        } else {
-          LOGGER.debug("Skip invalid note file: {}", s.getKey());
+    List<String> objectKeys = ossOperator.listDirObjects(bucketName, rootFolder + "/");
+    for (String key : objectKeys) {
+      if (key.endsWith(".zpln")) {
+        try {
+          String noteId = getNoteId(key);
+          String notePath = getNotePath(rootFolder, key);
+          notesInfo.put(noteId, new NoteInfo(noteId, notePath));
+        } catch (IOException e) {
+          LOGGER.warn(e.getMessage());
         }
+      } else {
+        LOGGER.debug("Skip invalid note file: {}", key);
       }
-      nextMarker = objectListing.getNextMarker();
-    } while (objectListing.isTruncated());
-
+    }
     return notesInfo;
   }
 
+  public Note getByOSSPath(String noteId, String ossPath) throws IOException {
+    String noteText = ossOperator.getTextObject(bucketName, ossPath);
+    return Note.fromJson(noteId, noteText);
+  }
+
+
   @Override
   public Note get(String noteId, String notePath, AuthenticationInfo subject) throws IOException {
-    OSSObject ossObject = ossClient.getObject(bucketName,
-            rootFolder + "/" + buildNoteFileName(noteId, notePath));
-    InputStream in = null;
-    try {
-      in = ossObject.getObjectContent();
-      return Note.fromJson(noteId, IOUtils.toString(in, StandardCharsets.UTF_8));
-    } finally {
-      if (in != null) {
-        in.close();
-      }
-    }
+    return getByOSSPath(noteId, rootFolder + "/" + buildNoteFileName(noteId, notePath));
   }
 
   @Override
   public void save(Note note, AuthenticationInfo subject) throws IOException {
     String content = note.toJson();
-    PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName,
+    ossOperator.putTextObject(bucketName,
             rootFolder + "/" + buildNoteFileName(note.getId(), note.getPath()),
             new ByteArrayInputStream(content.getBytes()));
-    ossClient.putObject(putObjectRequest);
   }
 
   @Override
   public void move(String noteId, String notePath, String newNotePath,
                    AuthenticationInfo subject) throws IOException {
-    String sourceKey = rootFolder + "/" + buildNoteFileName(noteId, notePath);
-    String destKey = rootFolder + "/" + buildNoteFileName(noteId, newNotePath);
-    CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName,
-            sourceKey, bucketName, destKey);
-    ossClient.copyObject(copyObjectRequest);
-    ossClient.deleteObject(bucketName, sourceKey);
+    String noteSourceKey = rootFolder + "/" + buildNoteFileName(noteId, notePath);
+    String noteDestKey = rootFolder + "/" + buildNoteFileName(noteId, newNotePath);
+    ossOperator.moveObject(bucketName, noteSourceKey, noteDestKey);
+    String revisionSourceDirKey = rootFolder + "/" + buildRevisionsDirName(noteId, notePath);
+    String revisionDestDirKey = rootFolder + "/" + buildRevisionsDirName(noteId, newNotePath);
+    ossOperator.moveDir(bucketName, revisionSourceDirKey, revisionDestDirKey);
   }
 
   @Override
   public void move(String folderPath, String newFolderPath, AuthenticationInfo subject) {
-    final int maxKeys = 200;
-    String nextMarker = null;
-    ObjectListing objectListing = null;
-    do {
-      objectListing = ossClient.listObjects(
-              new ListObjectsRequest(bucketName)
-                      .withPrefix(rootFolder + folderPath + "/")
-                      .withMarker(nextMarker)
-                      .withMaxKeys(maxKeys));
-      List<OSSObjectSummary> sums = objectListing.getObjectSummaries();
-      for (OSSObjectSummary s : sums) {
-        if (s.getKey().endsWith(".zpln")) {
-          try {
-            String noteId = getNoteId(s.getKey());
-            String notePath = getNotePath(rootFolder, s.getKey());
-            String newNotePath = newFolderPath + notePath.substring(folderPath.length());
-            move(noteId, notePath, newNotePath, subject);
-          } catch (IOException e) {
-            LOGGER.warn(e.getMessage());
-          }
-        } else {
-          LOGGER.debug("Skip invalid note file: {}", s.getKey());
+    List<String> objectKeys = ossOperator.listDirObjects(bucketName, rootFolder + folderPath + "/");
+    for (String key : objectKeys) {
+      if (key.endsWith(".zpln")) {
+        try {
+          String noteId = getNoteId(key);
+          String notePath = getNotePath(rootFolder, key);
+          String newNotePath = newFolderPath + notePath.substring(folderPath.length());
+          move(noteId, notePath, newNotePath, subject);
+        } catch (IOException e) {
+          LOGGER.warn(e.getMessage());
         }
+      } else {
+        LOGGER.debug("Skip invalid note file: {}", key);
       }
-      nextMarker = objectListing.getNextMarker();
-    } while (objectListing.isTruncated());
+    }
 
   }
 
   @Override
   public void remove(String noteId, String notePath, AuthenticationInfo subject)
-      throws IOException {
-    ossClient.deleteObject(bucketName, rootFolder + "/" + buildNoteFileName(noteId, notePath));
+          throws IOException {
+    ossOperator.deleteFile(bucketName, rootFolder + "/" + buildNoteFileName(noteId, notePath));
+    // if there is no file under revisonInfoPath, deleleDir() would do nothing
+    ossOperator.deleteDir(bucketName, rootFolder + "/" + buildRevisionsDirName(noteId, notePath));
   }
 
   @Override
-  public void remove(String folderPath, AuthenticationInfo subject) {
-    String nextMarker = null;
-    ObjectListing objectListing = null;
-    do {
-      ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName)
-              .withPrefix(rootFolder + folderPath + "/")
-              .withMarker(nextMarker);
-      objectListing = ossClient.listObjects(listObjectsRequest);
-      if (!objectListing.getObjectSummaries().isEmpty()) {
-        List<String> keys = new ArrayList<>();
-        for (OSSObjectSummary s : objectListing.getObjectSummaries()) {
-          keys.add(s.getKey());
+  public void remove(String folderPath, AuthenticationInfo subject) throws IOException {
+    List<String> objectKeys = ossOperator.listDirObjects(bucketName, rootFolder + folderPath + "/");
+    for (String key : objectKeys) {
+      if (key.endsWith(".zpln")) {
+        try {
+          String noteId = getNoteId(key);
+          String notePath = getNotePath(rootFolder, key);
+          // delete note revision file
+          ossOperator.deleteDir(bucketName, rootFolder + "/" + buildRevisionsDirName(noteId, notePath));
+        } catch (IOException e) {
+          LOGGER.warn(e.getMessage());
         }
-        DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName).withKeys(keys);
-        ossClient.deleteObjects(deleteObjectsRequest);
       }
-
-      nextMarker = objectListing.getNextMarker();
-    } while (objectListing.isTruncated());
+    }
+    // delete note file
+    ossOperator.deleteFiles(bucketName, objectKeys);
   }
 
+
   @Override
   public void close() {
-    ossClient.shutdown();
+    ossOperator.shutdown();
   }
 
   @Override
@@ -214,4 +192,94 @@ public class OSSNotebookRepo implements NotebookRepo {
     LOGGER.warn("Method not implemented");
   }
 
+
+  private static String buildRevisionsDirName(String noteId, String notePath) throws IOException {
+    if (!notePath.startsWith("/")) {
+      throw new IOException("Invalid notePath: " + notePath);
+    }
+    return ".checkpoint/" + (notePath + "_" + noteId).substring(1);
+  }
+
+  private String buildRevisionsInfoAbsolutePath(String noteId, String notePath) throws IOException {
+    return rootFolder + "/" + buildRevisionsDirName(noteId, notePath) + "/" + ".revision-info";
+  }
+
+  private String buildRevisionsFileAbsolutePath(String noteId, String notePath, String revisionId) throws IOException {
+    return rootFolder + "/" + buildRevisionsDirName(noteId, notePath) + "/" + revisionId;
+  }
+
+
+  @Override
+  public Revision checkpoint(String noteId, String notePath, String checkpointMsg, AuthenticationInfo subject) throws IOException {
+    if (maxVersionNumber <= 0) {
+      throw new IOException("Version control is closed because the value of zeppelin.notebook.oss.version.max is set to 0");
+    }
+
+    Note note = get(noteId, notePath, subject);
+
+    //1 Write note content to revision file
+    String revisionId = UUID.randomUUID().toString().replace("-", "");
+    String noteContent = note.toJson();
+    ossOperator.putTextObject(bucketName,
+            buildRevisionsFileAbsolutePath(noteId, notePath, revisionId),
+            new ByteArrayInputStream(noteContent.getBytes()));
+
+    //2 Append revision info
+    Revision revision = new Revision(revisionId, checkpointMsg, (int) (System.currentTimeMillis() / 1000L));
+    // check revision info file if existed
+    RevisionsInfo revisionsHistory = new RevisionsInfo();
+    String revisonInfoPath = buildRevisionsInfoAbsolutePath(noteId, notePath);
+    boolean found = ossOperator.doesObjectExist(bucketName, revisonInfoPath);
+    if (found) {
+      String existedRevisionsInfoText = ossOperator.getTextObject(bucketName, revisonInfoPath);
+      revisionsHistory = RevisionsInfo.fromText(existedRevisionsInfoText);
+      // control the num of revison files, clean the oldest one if it exceeds.
+      if (revisionsHistory.size() >= maxVersionNumber) {
+        Revision deletedRevision = revisionsHistory.removeLast();
+        ossOperator.deleteFile(bucketName, buildRevisionsFileAbsolutePath(noteId, notePath, deletedRevision.id));
+      }
+    }
+    revisionsHistory.addFirst(revision);
+
+    ossOperator.putTextObject(bucketName,
+            buildRevisionsInfoAbsolutePath(noteId, notePath),
+            new ByteArrayInputStream(revisionsHistory.toText().getBytes()));
+
+    return revision;
+  }
+
+  @Override
+  public Note get(String noteId, String notePath, String revId, AuthenticationInfo subject) throws IOException {
+    Note note = getByOSSPath(noteId,  buildRevisionsFileAbsolutePath(noteId, notePath, revId));
+    if (note != null) {
+      note.setPath(notePath);
+    }
+    return note;
+  }
+
+  @Override
+  public List<Revision> revisionHistory(String noteId, String notePath, AuthenticationInfo subject) throws IOException {
+    if (maxVersionNumber <= 0) {
+      return new ArrayList<>();
+    }
+
+    List<Revision> revisions = new LinkedList<>();
+    String revisonInfoPath = buildRevisionsInfoAbsolutePath(noteId, notePath);
+    boolean found = ossOperator.doesObjectExist(bucketName, revisonInfoPath);
+    if (!found) {
+      return revisions;
+    }
+    String revisionsText = ossOperator.getTextObject(bucketName, revisonInfoPath);
+
+    return RevisionsInfo.fromText(revisionsText);
+  }
+
+  @Override
+  public Note setNoteRevision(String noteId, String notePath, String revId, AuthenticationInfo subject) throws IOException {
+    Note revisionNote = get(noteId, notePath, revId, subject);
+    if (revisionNote != null) {
+      save(revisionNote, subject);
+    }
+    return revisionNote;
+  }
 }
diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/RevisionsInfo.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/RevisionsInfo.java
new file mode 100644
index 0000000000..44b2aaa9cc
--- /dev/null
+++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/RevisionsInfo.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo;
+
+import com.google.gson.Gson;
+
+import java.util.LinkedList;
+
+public class RevisionsInfo extends LinkedList<NotebookRepoWithVersionControl.Revision> {
+
+  private static Gson GSON = new Gson();
+
+  public static RevisionsInfo fromText(String revisionsText) {
+    RevisionsInfo revisionsInfo = GSON.fromJson(revisionsText, RevisionsInfo.class);
+    if (revisionsInfo == null) {
+      return new RevisionsInfo();
+    }
+    return revisionsInfo;
+  }
+
+  public String toText() {
+    return GSON.toJson(this);
+  }
+}
diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/OSSOperator.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/OSSOperator.java
new file mode 100644
index 0000000000..c04fa8df3c
--- /dev/null
+++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/OSSOperator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo.storage;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.model.*;
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+
+/**
+ * OSSOperator is a higher-level encapsulation of OSSClient,
+ * which makes OSSNotebookRepo shield from specific OSS operations
+ * or complex combinations of them.
+ */
+public class OSSOperator implements RemoteStorageOperator {
+  private OSS ossClient;
+
+  public OSSOperator(String endpoint, String accessKeyId, String accessKeySecret) {
+    this.ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
+  }
+
+
+  @Override
+  public void createBucket(String bucketName) {
+    ossClient.createBucket(bucketName);
+  }
+
+
+  @Override
+  public void deleteBucket(String bucketName) {
+    ossClient.deleteBucket(bucketName);
+  }
+
+  @Override
+  public boolean doesObjectExist(String bucketName, String key) throws IOException {
+    return ossClient.doesObjectExist(bucketName, key);
+  }
+
+
+  @Override
+  public String getTextObject(String bucketName, String key) throws IOException {
+    if (!doesObjectExist(bucketName, key)) {
+      throw new IOException("Note or its revision not found");
+    }
+    OSSObject ossObject = ossClient.getObject(bucketName, key);
+    InputStream in = null;
+    try {
+      in = ossObject.getObjectContent();
+      return IOUtils.toString(in, StandardCharsets.UTF_8);
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
+  }
+
+
+  @Override
+  public void putTextObject(String bucketName, String key, InputStream inputStream) {
+    PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, inputStream);
+    ossClient.putObject(putObjectRequest);
+  }
+
+
+  @Override
+  public void moveObject(String bucketName, String sourceKey, String destKey) throws IOException {
+    if (!doesObjectExist(bucketName, sourceKey)) {
+      throw new IOException("Note or its revision not found");
+    }
+    CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName,
+            sourceKey, bucketName, destKey);
+    ossClient.copyObject(copyObjectRequest);
+    ossClient.deleteObject(bucketName, sourceKey);
+  }
+
+  @Override
+  public void moveDir(String bucketName, String sourceDir, String destDir) throws IOException {
+    List<String> objectKeys = listDirObjects(bucketName, sourceDir);
+    for (String key : objectKeys) {
+      moveObject(bucketName, key, destDir + key.substring(sourceDir.length()));
+    }
+  }
+
+
+  @Override
+  public void deleteDir(String bucketName, String dirname) {
+    List<String> keys = listDirObjects(bucketName, dirname);
+    deleteFiles(bucketName, keys);
+  }
+
+  @Override
+  public void deleteFile(String bucketName, String objectKey) throws IOException {
+    deleteFiles(bucketName, Arrays.asList(objectKey));
+  }
+
+  @Override
+  public void deleteFiles(String bucketName, List<String> objectKeys) {
+    if (objectKeys != null && objectKeys.size() > 0) {
+      DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName).withKeys(objectKeys);
+      ossClient.deleteObjects(deleteObjectsRequest);
+    }
+  }
+
+
+  @Override
+  public List<String> listDirObjects(String bucketName, String dirname) {
+    String nextMarker = null;
+    ObjectListing objectListing = null;
+    List<String> keys = new ArrayList<>();
+    do {
+      ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName)
+              .withPrefix(dirname)
+              .withMarker(nextMarker);
+      objectListing = ossClient.listObjects(listObjectsRequest);
+      if (!objectListing.getObjectSummaries().isEmpty()) {
+        for (OSSObjectSummary s : objectListing.getObjectSummaries()) {
+          keys.add(s.getKey());
+        }
+      }
+
+      nextMarker = objectListing.getNextMarker();
+    } while (objectListing.isTruncated());
+    return keys;
+  }
+
+  @Override
+  public void shutdown() {
+    ossClient.shutdown();
+  }
+}
diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/RemoteStorageOperator.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/RemoteStorageOperator.java
new file mode 100644
index 0000000000..e3dfedcd07
--- /dev/null
+++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/RemoteStorageOperator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo.storage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public interface RemoteStorageOperator {
+  void createBucket(String bucketName) throws IOException;
+
+  void deleteBucket(String bucketName) throws IOException;
+
+  boolean doesObjectExist(String bucketName, String key) throws IOException;
+
+  String getTextObject(String bucketName, String key) throws IOException;
+
+  void putTextObject(String bucketName, String key, InputStream inputStream) throws IOException;
+
+  void moveObject(String bucketName, String sourceKey, String destKey) throws IOException;
+
+  void moveDir(String bucketName, String sourceDir, String destDir) throws IOException;
+
+  void deleteDir(String bucketName, String dirname) throws IOException;
+
+  void deleteFile(String bucketName, String objectKey) throws IOException;
+
+  void deleteFiles(String bucketName, List<String> objectKeys) throws IOException;
+
+  List<String> listDirObjects(String bucketName, String dirname);
+
+  void shutdown();
+}
diff --git a/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/MockStorageOperator.java b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/MockStorageOperator.java
new file mode 100644
index 0000000000..3aa8d6d196
--- /dev/null
+++ b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/MockStorageOperator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.zeppelin.notebook.repo.storage.RemoteStorageOperator;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MockStorageOperator implements RemoteStorageOperator {
+
+  private String mockRootFolder;
+
+  public MockStorageOperator() throws IOException {
+    Path tempDirectory = Files.createTempDirectory("zeppelin_mock_storage_dir_");
+    mockRootFolder = tempDirectory.toString() + "/";
+  }
+
+  @Override
+  public void createBucket(String bucketName) throws IOException {
+    FileUtils.forceMkdir(new File(mockRootFolder + bucketName));
+  }
+
+  @Override
+  public void deleteBucket(String bucketName) throws IOException {
+    FileUtils.deleteDirectory(new File(mockRootFolder + bucketName));
+  }
+
+  @Override
+  public boolean doesObjectExist(String bucketName, String key) throws IOException {
+    File file = new File(mockRootFolder + bucketName + "/" + key);
+    return file.exists() && !file.isDirectory();
+  }
+
+  @Override
+  public String getTextObject(String bucketName, String key) throws IOException {
+    if (!doesObjectExist(bucketName, key)) {
+      throw new IOException("Note or its revision not found");
+    }
+    return FileUtils.readFileToString(new File(mockRootFolder + bucketName + "/" + key), "UTF-8");
+  }
+
+  @Override
+  public void putTextObject(String bucketName, String key, InputStream inputStream) throws IOException {
+    File destination = new File(mockRootFolder + bucketName + "/" + key);
+    destination.getParentFile().mkdirs();
+    FileUtils.copyInputStreamToFile(inputStream, destination);
+  }
+
+  @Override
+  public void moveObject(String bucketName, String sourceKey, String destKey) throws IOException {
+    FileUtils.moveFile(new File(mockRootFolder + bucketName + "/" + sourceKey),
+            new File(mockRootFolder + bucketName + "/" + destKey));
+  }
+
+  @Override
+  public void moveDir(String bucketName, String sourceDir, String destDir) throws IOException {
+    List<String> objectKeys = listDirObjects(bucketName, sourceDir);
+    for (String key : objectKeys) {
+      moveObject(bucketName, key, destDir + key.substring(sourceDir.length()));
+    }
+  }
+
+  @Override
+  public void deleteDir(String bucketName, String dirname) throws IOException {
+    List<String> keys = listDirObjects(bucketName, dirname);
+    deleteFiles(bucketName, keys);
+  }
+
+  @Override
+  public void deleteFile(String bucketName, String objectKey) throws IOException {
+    FileUtils.forceDelete(new File(mockRootFolder + bucketName + "/" + objectKey));
+  }
+
+  @Override
+  public void deleteFiles(String bucketName, List<String> objectKeys) throws IOException {
+    if (objectKeys != null && objectKeys.size() > 0) {
+      for (String objectKey : objectKeys) {
+        deleteFile(bucketName, objectKey);
+      }
+    }
+  }
+
+  @Override
+  public List<String> listDirObjects(String bucketName, String dirname) {
+    File directory = new File(mockRootFolder + bucketName + "/" + dirname);
+    if (!directory.isDirectory()) {
+      return new ArrayList<>();
+    }
+    Collection<File> files = FileUtils.listFiles(directory, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+    return files.stream().map(file -> file.getPath().substring((mockRootFolder + bucketName + "/").length())).collect(Collectors.toList());
+  }
+
+  @Override
+  public void shutdown() {
+
+  }
+}
diff --git a/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepoTest.java b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepoTest.java
new file mode 100644
index 0000000000..a84598d0aa
--- /dev/null
+++ b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepoTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.repo.storage.RemoteStorageOperator;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class OSSNotebookRepoTest {
+
+  private AuthenticationInfo anonymous = AuthenticationInfo.ANONYMOUS;
+  private OSSNotebookRepo notebookRepo;
+  private RemoteStorageOperator ossOperator;
+  private String bucket;
+  private static int OSS_VERSION_MAX = 30;
+
+
+
+  @Before
+  public void setUp() throws IOException {
+    bucket = "zeppelin-test-bucket";
+    String endpoint = "yourEndpoint";
+    String accessKeyId = "yourAccessKeyId";
+    String accessKeySecret = "yourAccessKeySecret";
+    ossOperator = new MockStorageOperator();
+    ossOperator.createBucket(bucket);
+    notebookRepo = new OSSNotebookRepo();
+    ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_ENDPOINT.getVarName(),
+            endpoint);
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_BUCKET.getVarName(),
+            bucket);
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYID.getVarName(),
+            accessKeyId);
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYSECRET.getVarName(),
+            accessKeySecret);
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_VERSION_MAX.getVarName(),
+            OSS_VERSION_MAX + "");
+    notebookRepo.init(conf);
+    notebookRepo.setOssOperator(ossOperator);
+  }
+
+  @After
+  public void tearDown() throws InterruptedException, IOException {
+    ossOperator.deleteDir(bucket, "");
+    ossOperator.deleteBucket(bucket);
+    // The delete operations on OSS Service above has a delay.
+    // And it would affect setup of next test case if we do not wait for them to end.
+    Thread.sleep(1000);
+
+    // notebookRepo.close() would call ossOperator.shutdown()
+    if (notebookRepo != null) {
+      notebookRepo.close();
+    }
+  }
+
+  @Test
+  public void testNotebookRepo() throws IOException {
+    Map<String, NoteInfo> notesInfo = notebookRepo.list(anonymous);
+    assertEquals(0, notesInfo.size());
+
+    // create Note note1
+    Note note1 = new Note();
+    note1.setPath("/spark/note_1");
+    notebookRepo.save(note1, anonymous);
+
+    //
+    for (int i = 1; i <= OSS_VERSION_MAX + 3; i++) {
+      Paragraph p = new Paragraph(note1, null);
+      p.setText("text" + i);
+      p.setStatus(Job.Status.RUNNING);
+      p.setAuthenticationInfo(new AuthenticationInfo("anonymous", (String) null, "anonymous"));
+      note1.addParagraph(p);
+      notebookRepo.save(note1, anonymous);
+      notebookRepo.checkpoint(note1.getId(), note1.getPath(), "commit " + i, anonymous);
+    }
+
+    notesInfo = notebookRepo.list(anonymous);
+    assertEquals(1, notesInfo.size());
+    assertEquals("/spark/note_1", notesInfo.get(note1.getId()).getPath());
+
+    // Get note1
+    Note noteFromRepo = notebookRepo.get(note1.getId(), note1.getPath(), anonymous);
+    assertEquals(note1.getName(), noteFromRepo.getName());
+
+    // Get non-existed note
+    try {
+      notebookRepo.get("invalid_id", "/invalid_path", anonymous);
+      fail("Should fail to get non-existed note1");
+    } catch (IOException e) {
+      assertEquals(e.getMessage(), "Note or its revision not found");
+    }
+
+    // create another Note note2
+    Note note2 = new Note();
+    note2.setPath("/spark/note_2");
+    notebookRepo.save(note2, anonymous);
+
+    notesInfo = notebookRepo.list(anonymous);
+    assertEquals(2, notesInfo.size());
+    assertEquals("/spark/note_1", notesInfo.get(note1.getId()).getPath());
+    assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath());
+
+    // move note1
+    notebookRepo.move(note1.getId(), note1.getPath(), "/spark2/note_1", anonymous);
+
+    notesInfo = notebookRepo.list(anonymous);
+    assertEquals(2, notesInfo.size());
+    assertEquals("/spark2/note_1", notesInfo.get(note1.getId()).getPath());
+    assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath());
+
+    // move folder
+    notebookRepo.move("/spark2", "/spark3", anonymous);
+
+    notesInfo = notebookRepo.list(anonymous);
+    assertEquals(2, notesInfo.size());
+    assertEquals("/spark3/note_1", notesInfo.get(note1.getId()).getPath());
+    assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath());
+
+    // delete note
+    notebookRepo.remove(note1.getId(), notesInfo.get(note1.getId()).getPath(), anonymous);
+    notesInfo = notebookRepo.list(anonymous);
+    assertEquals(1, notesInfo.size());
+    assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath());
+
+    // delete folder
+    notebookRepo.remove("/spark", anonymous);
+    notesInfo = notebookRepo.list(anonymous);
+    assertEquals(0, notesInfo.size());
+  }
+
+
+  @Test
+  public void testNotebookRepoWithVersionControl() throws IOException {
+    Map<String, NoteInfo> notesInfo = notebookRepo.list(anonymous);
+    assertEquals(0, notesInfo.size());
+
+    // create Note note1
+    Note note1 = new Note();
+    note1.setPath("/version_control/note_1");
+
+    List<NotebookRepoWithVersionControl.Revision> revisionList = new ArrayList<>();
+
+    for (int i = 1; i <= OSS_VERSION_MAX + 3; i++) {
+      Paragraph p = new Paragraph(note1, null);
+      p.setText("text" + i);
+      p.setStatus(Job.Status.RUNNING);
+      p.setAuthenticationInfo(new AuthenticationInfo("anonymous", (String) null, "anonymous"));
+      note1.addParagraph(p);
+      notebookRepo.save(note1, anonymous);
+
+      // checkpoint
+      NotebookRepoWithVersionControl.Revision revision = notebookRepo.checkpoint(note1.getId(), note1.getPath(), "commit " + i, anonymous);
+      revisionList.add(revision);
+
+      List<NotebookRepoWithVersionControl.Revision> revisionsHistory = notebookRepo.revisionHistory(note1.getId(), note1.getPath(), anonymous);
+      // verify OSS_VERSION_MAX control
+      if (i <= OSS_VERSION_MAX) {
+        assertEquals(i, revisionsHistory.size());
+      } else {
+        assertEquals(OSS_VERSION_MAX, revisionsHistory.size());
+      }
+    }
+
+    // get note by non-existed revisionId
+    for (int i = 1; i <= 3; i++) {
+      try {
+        notebookRepo.get(note1.getId(), note1.getPath(), revisionList.get(i - 1).id, anonymous);
+        fail("Should fail to get non-existed note1");
+      } catch (IOException e) {
+        assertEquals(e.getMessage(), "Note or its revision not found");
+      }
+    }
+
+    // get note by existed revisionId
+    for (int i = 4; i <= OSS_VERSION_MAX + 3; i++) {
+      Note note = notebookRepo.get(note1.getId(), note1.getPath(), revisionList.get(i - 1).id, anonymous);
+      assertEquals(i, note.getParagraphs().size());
+    }
+
+    // revisionsHistory
+    List<NotebookRepoWithVersionControl.Revision> revisionsHistory = notebookRepo.revisionHistory(note1.getId(), note1.getPath(), anonymous);
+    for (int i = 0; i < revisionsHistory.size(); i++) {
+      assertEquals(revisionsHistory.get(i).id, revisionList.get(revisionList.size() - i - 1).id);
+      assertEquals(revisionsHistory.get(i).message, revisionList.get(revisionList.size() - i - 1).message);
+      assertEquals(revisionsHistory.get(i).time, revisionList.get(revisionList.size() - i - 1).time);
+    }
+
+
+    // Modify note to distinguish itself with last version
+    Paragraph p = new Paragraph(note1, null);
+    p.setText("text" + OSS_VERSION_MAX + 4);
+    p.setStatus(Job.Status.RUNNING);
+    p.setAuthenticationInfo(new AuthenticationInfo("anonymous", (String) null, "anonymous"));
+    note1.addParagraph(p);
+    notebookRepo.save(note1, anonymous);
+
+    assertEquals(notebookRepo.get(note1.getId(), note1.getPath(), anonymous).getParagraphs().size(), OSS_VERSION_MAX + 4);
+
+    // Assume OSS_VERSION_MAX = 30
+    // revert note to revision 31 , then to revision 32, then to revision 33, finally to revision 31
+    for (int i = OSS_VERSION_MAX + 1; i <= OSS_VERSION_MAX + 3; i++) {
+      notebookRepo.setNoteRevision(note1.getId(), note1.getPath(), revisionList.get(i - 1).id, anonymous);
+      assertEquals(notebookRepo.get(note1.getId(), note1.getPath(), anonymous).getParagraphs().size(), i);
+    }
+
+    // finally revert note to revision 31
+    notebookRepo.setNoteRevision(note1.getId(), note1.getPath(), revisionList.get(OSS_VERSION_MAX).id, anonymous);
+    assertEquals(notebookRepo.get(note1.getId(), note1.getPath(), anonymous).getParagraphs().size(), OSS_VERSION_MAX + 1);
+
+    notebookRepo.remove("/version_control", anonymous);
+  }
+}