You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2022/04/25 03:55:24 UTC
[zeppelin] branch master updated: [ZEPPELIN-5655] OSSNotebookRepo support version control. (#4308)
This is an automated email from the ASF dual-hosted git repository.
jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 239f8bf8b4 [ZEPPELIN-5655] OSSNotebookRepo support version control. (#4308)
239f8bf8b4 is described below
commit 239f8bf8b409c6d94d9e38643e4051d685af6b5b
Author: Guanhua Li <gu...@foxmail.com>
AuthorDate: Mon Apr 25 11:55:15 2022 +0800
[ZEPPELIN-5655] OSSNotebookRepo support version control. (#4308)
* OSSNotebookRepo support version control.
* format
* OSSNotebokRepo remove dependenc of ossclient
* Some optimization
* add test
* ADD MOCK service to mock OSSOperator in localFileSystem
* fix bug of rootFolder path in OSSNotebookRepo
* Update for reviews.
* Insert the license header into MockStorageOperator.java and RemoteStorageOperator.java.
* update doc
* configuration to disable version control in OSSNoteBookRepo
* update for reviews
* Update zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java
To key a naming convention.
Co-authored-by: Jongyoul Lee <jo...@gmail.com>
Co-authored-by: Jongyoul Lee <jo...@gmail.com>
---
docs/setup/storage/storage.md | 10 +
.../zeppelin/conf/ZeppelinConfiguration.java | 5 +
.../zeppelin/notebook/repo/OSSNotebookRepo.java | 290 +++++++++++++--------
.../zeppelin/notebook/repo/RevisionsInfo.java | 39 +++
.../notebook/repo/storage/OSSOperator.java | 153 +++++++++++
.../repo/storage/RemoteStorageOperator.java | 48 ++++
.../notebook/repo/MockStorageOperator.java | 122 +++++++++
.../notebook/repo/OSSNotebookRepoTest.java | 243 +++++++++++++++++
8 files changed, 799 insertions(+), 111 deletions(-)
diff --git a/docs/setup/storage/storage.md b/docs/setup/storage/storage.md
index dc85cba219..bbc9583728 100644
--- a/docs/setup/storage/storage.md
+++ b/docs/setup/storage/storage.md
@@ -454,6 +454,16 @@ And you should configure oss related properties in file **zeppelin-site.xml**.
<description>Access key secret for your OSS account</description>
</property>
+<property>
+ <name>zeppelin.notebook.oss.version.max</name>
+ <value>30</value>
+ <description>
+ Max num of note versions in OSSNoteBookRepo.
+ It's not mandatory, the default value is 30.
+ A value of 0 means that version control in OSSNoteBookRepo is disabled.
+ </description>
+</property>
+
```
Uncomment the next property for use OSSNotebookRepo class:
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 8021e20ad5..96a4b10de8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -500,6 +500,10 @@ public class ZeppelinConfiguration {
return getString(ConfVars.ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYSECRET);
}
+ public int getOSSNoteMaxVersionNum(){
+ return getInt(ConfVars.ZEPPELIN_NOTEBOOK_OSS_VERSION_MAX);
+ }
+
public String getMongoUri() {
return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_URI);
}
@@ -973,6 +977,7 @@ public class ZeppelinConfiguration {
ZEPPELIN_NOTEBOOK_OSS_ENDPOINT("zeppelin.notebook.oss.endpoint", "http://oss-cn-hangzhou.aliyuncs.com"),
ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYID("zeppelin.notebook.oss.accesskeyid", null),
ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYSECRET("zeppelin.notebook.oss.accesskeysecret", null),
+ ZEPPELIN_NOTEBOOK_OSS_VERSION_MAX("zeppelin.notebook.oss.version.max", 30),
ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING("zeppelin.notebook.azure.connectionString", null),
ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"),
diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java
index 81096dbc6f..5fddf9904b 100644
--- a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java
+++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java
@@ -17,43 +17,33 @@
package org.apache.zeppelin.notebook.repo;
-import com.aliyun.oss.OSS;
-import com.aliyun.oss.OSSClientBuilder;
-import com.aliyun.oss.model.CopyObjectRequest;
-import com.aliyun.oss.model.DeleteObjectsRequest;
-import com.aliyun.oss.model.ListObjectsRequest;
-import com.aliyun.oss.model.OSSObject;
-import com.aliyun.oss.model.OSSObjectSummary;
-import com.aliyun.oss.model.ObjectListing;
-import com.aliyun.oss.model.PutObjectRequest;
-import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.notebook.repo.storage.OSSOperator;
+import org.apache.zeppelin.notebook.repo.storage.RemoteStorageOperator;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
+import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* NotebookRepo for Aliyun OSS (https://cn.aliyun.com/product/oss)
*/
-public class OSSNotebookRepo implements NotebookRepo {
+public class OSSNotebookRepo implements NotebookRepoWithVersionControl {
private static final Logger LOGGER = LoggerFactory.getLogger(OSSNotebookRepo.class);
- private OSS ossClient;
private String bucketName;
private String rootFolder;
+ private int maxVersionNumber;
+
+ // Use ossOperator instead of ossClient directly
+ private RemoteStorageOperator ossOperator;
public OSSNotebookRepo() {
}
@@ -63,144 +53,132 @@ public class OSSNotebookRepo implements NotebookRepo {
String endpoint = conf.getOSSEndpoint();
bucketName = conf.getOSSBucketName();
rootFolder = conf.getNotebookDir();
- // rootFolder is part of OSS key which doesn't start with '/'
- if (rootFolder.startsWith("/")) {
- rootFolder = rootFolder.substring(1);
- }
+ maxVersionNumber = conf.getOSSNoteMaxVersionNum();
+ // rootFolder is part of OSS key
+ rootFolder = formatPath(rootFolder);
String accessKeyId = conf.getOSSAccessKeyId();
String accessKeySecret = conf.getOSSAccessKeySecret();
- this.ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
+ this.ossOperator = new OSSOperator(endpoint, accessKeyId, accessKeySecret);
+ }
+
+ private static String formatPath(String path) {
+ // The path should not start with '/' or './' or './/'
+ // because it is not accepted by OSS service.
+ if (path.startsWith("/")) {
+ path = path.substring(1);
+ }
+ path = new File(path).getPath();
+ if (path.startsWith("./")) {
+ path = path.substring(2);
+ }
+ return path;
+ }
+
+ public void setOssOperator(RemoteStorageOperator ossOperator) {
+ this.ossOperator = ossOperator;
}
@Override
public Map<String, NoteInfo> list(AuthenticationInfo subject) throws IOException {
Map<String, NoteInfo> notesInfo = new HashMap<>();
- final int maxKeys = 200;
- String nextMarker = null;
- ObjectListing objectListing = null;
- do {
- objectListing = ossClient.listObjects(
- new ListObjectsRequest(bucketName)
- .withPrefix(rootFolder + "/")
- .withMarker(nextMarker)
- .withMaxKeys(maxKeys));
- List<OSSObjectSummary> sums = objectListing.getObjectSummaries();
- for (OSSObjectSummary s : sums) {
- if (s.getKey().endsWith(".zpln")) {
- try {
- String noteId = getNoteId(s.getKey());
- String notePath = getNotePath(rootFolder, s.getKey());
- notesInfo.put(noteId, new NoteInfo(noteId, notePath));
- } catch (IOException e) {
- LOGGER.warn(e.getMessage());
- }
- } else {
- LOGGER.debug("Skip invalid note file: {}", s.getKey());
+ List<String> objectKeys = ossOperator.listDirObjects(bucketName, rootFolder + "/");
+ for (String key : objectKeys) {
+ if (key.endsWith(".zpln")) {
+ try {
+ String noteId = getNoteId(key);
+ String notePath = getNotePath(rootFolder, key);
+ notesInfo.put(noteId, new NoteInfo(noteId, notePath));
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
}
+ } else {
+ LOGGER.debug("Skip invalid note file: {}", key);
}
- nextMarker = objectListing.getNextMarker();
- } while (objectListing.isTruncated());
-
+ }
return notesInfo;
}
+ public Note getByOSSPath(String noteId, String ossPath) throws IOException {
+ String noteText = ossOperator.getTextObject(bucketName, ossPath);
+ return Note.fromJson(noteId, noteText);
+ }
+
+
@Override
public Note get(String noteId, String notePath, AuthenticationInfo subject) throws IOException {
- OSSObject ossObject = ossClient.getObject(bucketName,
- rootFolder + "/" + buildNoteFileName(noteId, notePath));
- InputStream in = null;
- try {
- in = ossObject.getObjectContent();
- return Note.fromJson(noteId, IOUtils.toString(in, StandardCharsets.UTF_8));
- } finally {
- if (in != null) {
- in.close();
- }
- }
+ return getByOSSPath(noteId, rootFolder + "/" + buildNoteFileName(noteId, notePath));
}
@Override
public void save(Note note, AuthenticationInfo subject) throws IOException {
String content = note.toJson();
- PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName,
+ ossOperator.putTextObject(bucketName,
rootFolder + "/" + buildNoteFileName(note.getId(), note.getPath()),
new ByteArrayInputStream(content.getBytes()));
- ossClient.putObject(putObjectRequest);
}
@Override
public void move(String noteId, String notePath, String newNotePath,
AuthenticationInfo subject) throws IOException {
- String sourceKey = rootFolder + "/" + buildNoteFileName(noteId, notePath);
- String destKey = rootFolder + "/" + buildNoteFileName(noteId, newNotePath);
- CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName,
- sourceKey, bucketName, destKey);
- ossClient.copyObject(copyObjectRequest);
- ossClient.deleteObject(bucketName, sourceKey);
+ String noteSourceKey = rootFolder + "/" + buildNoteFileName(noteId, notePath);
+ String noteDestKey = rootFolder + "/" + buildNoteFileName(noteId, newNotePath);
+ ossOperator.moveObject(bucketName, noteSourceKey, noteDestKey);
+ String revisionSourceDirKey = rootFolder + "/" + buildRevisionsDirName(noteId, notePath);
+ String revisionDestDirKey = rootFolder + "/" + buildRevisionsDirName(noteId, newNotePath);
+ ossOperator.moveDir(bucketName, revisionSourceDirKey, revisionDestDirKey);
}
@Override
public void move(String folderPath, String newFolderPath, AuthenticationInfo subject) {
- final int maxKeys = 200;
- String nextMarker = null;
- ObjectListing objectListing = null;
- do {
- objectListing = ossClient.listObjects(
- new ListObjectsRequest(bucketName)
- .withPrefix(rootFolder + folderPath + "/")
- .withMarker(nextMarker)
- .withMaxKeys(maxKeys));
- List<OSSObjectSummary> sums = objectListing.getObjectSummaries();
- for (OSSObjectSummary s : sums) {
- if (s.getKey().endsWith(".zpln")) {
- try {
- String noteId = getNoteId(s.getKey());
- String notePath = getNotePath(rootFolder, s.getKey());
- String newNotePath = newFolderPath + notePath.substring(folderPath.length());
- move(noteId, notePath, newNotePath, subject);
- } catch (IOException e) {
- LOGGER.warn(e.getMessage());
- }
- } else {
- LOGGER.debug("Skip invalid note file: {}", s.getKey());
+ List<String> objectKeys = ossOperator.listDirObjects(bucketName, rootFolder + folderPath + "/");
+ for (String key : objectKeys) {
+ if (key.endsWith(".zpln")) {
+ try {
+ String noteId = getNoteId(key);
+ String notePath = getNotePath(rootFolder, key);
+ String newNotePath = newFolderPath + notePath.substring(folderPath.length());
+ move(noteId, notePath, newNotePath, subject);
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
}
+ } else {
+ LOGGER.debug("Skip invalid note file: {}", key);
}
- nextMarker = objectListing.getNextMarker();
- } while (objectListing.isTruncated());
+ }
}
@Override
public void remove(String noteId, String notePath, AuthenticationInfo subject)
- throws IOException {
- ossClient.deleteObject(bucketName, rootFolder + "/" + buildNoteFileName(noteId, notePath));
+ throws IOException {
+ ossOperator.deleteFile(bucketName, rootFolder + "/" + buildNoteFileName(noteId, notePath));
+ // if there is no file under revisonInfoPath, deleleDir() would do nothing
+ ossOperator.deleteDir(bucketName, rootFolder + "/" + buildRevisionsDirName(noteId, notePath));
}
@Override
- public void remove(String folderPath, AuthenticationInfo subject) {
- String nextMarker = null;
- ObjectListing objectListing = null;
- do {
- ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName)
- .withPrefix(rootFolder + folderPath + "/")
- .withMarker(nextMarker);
- objectListing = ossClient.listObjects(listObjectsRequest);
- if (!objectListing.getObjectSummaries().isEmpty()) {
- List<String> keys = new ArrayList<>();
- for (OSSObjectSummary s : objectListing.getObjectSummaries()) {
- keys.add(s.getKey());
+ public void remove(String folderPath, AuthenticationInfo subject) throws IOException {
+ List<String> objectKeys = ossOperator.listDirObjects(bucketName, rootFolder + folderPath + "/");
+ for (String key : objectKeys) {
+ if (key.endsWith(".zpln")) {
+ try {
+ String noteId = getNoteId(key);
+ String notePath = getNotePath(rootFolder, key);
+ // delete note revision file
+ ossOperator.deleteDir(bucketName, rootFolder + "/" + buildRevisionsDirName(noteId, notePath));
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage());
}
- DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName).withKeys(keys);
- ossClient.deleteObjects(deleteObjectsRequest);
}
-
- nextMarker = objectListing.getNextMarker();
- } while (objectListing.isTruncated());
+ }
+ // delete note file
+ ossOperator.deleteFiles(bucketName, objectKeys);
}
+
@Override
public void close() {
- ossClient.shutdown();
+ ossOperator.shutdown();
}
@Override
@@ -214,4 +192,94 @@ public class OSSNotebookRepo implements NotebookRepo {
LOGGER.warn("Method not implemented");
}
+
+ private static String buildRevisionsDirName(String noteId, String notePath) throws IOException {
+ if (!notePath.startsWith("/")) {
+ throw new IOException("Invalid notePath: " + notePath);
+ }
+ return ".checkpoint/" + (notePath + "_" + noteId).substring(1);
+ }
+
+ private String buildRevisionsInfoAbsolutePath(String noteId, String notePath) throws IOException {
+ return rootFolder + "/" + buildRevisionsDirName(noteId, notePath) + "/" + ".revision-info";
+ }
+
+ private String buildRevisionsFileAbsolutePath(String noteId, String notePath, String revisionId) throws IOException {
+ return rootFolder + "/" + buildRevisionsDirName(noteId, notePath) + "/" + revisionId;
+ }
+
+
+ @Override
+ public Revision checkpoint(String noteId, String notePath, String checkpointMsg, AuthenticationInfo subject) throws IOException {
+ if (maxVersionNumber <= 0) {
+ throw new IOException("Version control is closed because the value of zeppelin.notebook.oss.version.max is set to 0");
+ }
+
+ Note note = get(noteId, notePath, subject);
+
+ //1 Write note content to revision file
+ String revisionId = UUID.randomUUID().toString().replace("-", "");
+ String noteContent = note.toJson();
+ ossOperator.putTextObject(bucketName,
+ buildRevisionsFileAbsolutePath(noteId, notePath, revisionId),
+ new ByteArrayInputStream(noteContent.getBytes()));
+
+ //2 Append revision info
+ Revision revision = new Revision(revisionId, checkpointMsg, (int) (System.currentTimeMillis() / 1000L));
+ // check revision info file if existed
+ RevisionsInfo revisionsHistory = new RevisionsInfo();
+ String revisonInfoPath = buildRevisionsInfoAbsolutePath(noteId, notePath);
+ boolean found = ossOperator.doesObjectExist(bucketName, revisonInfoPath);
+ if (found) {
+ String existedRevisionsInfoText = ossOperator.getTextObject(bucketName, revisonInfoPath);
+ revisionsHistory = RevisionsInfo.fromText(existedRevisionsInfoText);
+ // control the num of revison files, clean the oldest one if it exceeds.
+ if (revisionsHistory.size() >= maxVersionNumber) {
+ Revision deletedRevision = revisionsHistory.removeLast();
+ ossOperator.deleteFile(bucketName, buildRevisionsFileAbsolutePath(noteId, notePath, deletedRevision.id));
+ }
+ }
+ revisionsHistory.addFirst(revision);
+
+ ossOperator.putTextObject(bucketName,
+ buildRevisionsInfoAbsolutePath(noteId, notePath),
+ new ByteArrayInputStream(revisionsHistory.toText().getBytes()));
+
+ return revision;
+ }
+
+ @Override
+ public Note get(String noteId, String notePath, String revId, AuthenticationInfo subject) throws IOException {
+ Note note = getByOSSPath(noteId, buildRevisionsFileAbsolutePath(noteId, notePath, revId));
+ if (note != null) {
+ note.setPath(notePath);
+ }
+ return note;
+ }
+
+ @Override
+ public List<Revision> revisionHistory(String noteId, String notePath, AuthenticationInfo subject) throws IOException {
+ if (maxVersionNumber <= 0) {
+ return new ArrayList<>();
+ }
+
+ List<Revision> revisions = new LinkedList<>();
+ String revisonInfoPath = buildRevisionsInfoAbsolutePath(noteId, notePath);
+ boolean found = ossOperator.doesObjectExist(bucketName, revisonInfoPath);
+ if (!found) {
+ return revisions;
+ }
+ String revisionsText = ossOperator.getTextObject(bucketName, revisonInfoPath);
+
+ return RevisionsInfo.fromText(revisionsText);
+ }
+
+ @Override
+ public Note setNoteRevision(String noteId, String notePath, String revId, AuthenticationInfo subject) throws IOException {
+ Note revisionNote = get(noteId, notePath, revId, subject);
+ if (revisionNote != null) {
+ save(revisionNote, subject);
+ }
+ return revisionNote;
+ }
}
diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/RevisionsInfo.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/RevisionsInfo.java
new file mode 100644
index 0000000000..44b2aaa9cc
--- /dev/null
+++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/RevisionsInfo.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo;
+
+import com.google.gson.Gson;
+
+import java.util.LinkedList;
+
+public class RevisionsInfo extends LinkedList<NotebookRepoWithVersionControl.Revision> {
+
+ private static Gson GSON = new Gson();
+
+ public static RevisionsInfo fromText(String revisionsText) {
+ RevisionsInfo revisionsInfo = GSON.fromJson(revisionsText, RevisionsInfo.class);
+ if (revisionsInfo == null) {
+ return new RevisionsInfo();
+ }
+ return revisionsInfo;
+ }
+
+ public String toText() {
+ return GSON.toJson(this);
+ }
+}
diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/OSSOperator.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/OSSOperator.java
new file mode 100644
index 0000000000..c04fa8df3c
--- /dev/null
+++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/OSSOperator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo.storage;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.model.*;
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+
+/**
+ * OSSOperator is a higher-level encapsulation of OSSClient,
+ * which makes OSSNotebookRepo shield from specific OSS operations
+ * or complex combinations of them.
+ */
+public class OSSOperator implements RemoteStorageOperator {
+ private OSS ossClient;
+
+ public OSSOperator(String endpoint, String accessKeyId, String accessKeySecret) {
+ this.ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
+ }
+
+
+ @Override
+ public void createBucket(String bucketName) {
+ ossClient.createBucket(bucketName);
+ }
+
+
+ @Override
+ public void deleteBucket(String bucketName) {
+ ossClient.deleteBucket(bucketName);
+ }
+
+ @Override
+ public boolean doesObjectExist(String bucketName, String key) throws IOException {
+ return ossClient.doesObjectExist(bucketName, key);
+ }
+
+
+ @Override
+ public String getTextObject(String bucketName, String key) throws IOException {
+ if (!doesObjectExist(bucketName, key)) {
+ throw new IOException("Note or its revision not found");
+ }
+ OSSObject ossObject = ossClient.getObject(bucketName, key);
+ InputStream in = null;
+ try {
+ in = ossObject.getObjectContent();
+ return IOUtils.toString(in, StandardCharsets.UTF_8);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
+ }
+
+
+ @Override
+ public void putTextObject(String bucketName, String key, InputStream inputStream) {
+ PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, inputStream);
+ ossClient.putObject(putObjectRequest);
+ }
+
+
+ @Override
+ public void moveObject(String bucketName, String sourceKey, String destKey) throws IOException {
+ if (!doesObjectExist(bucketName, sourceKey)) {
+ throw new IOException("Note or its revision not found");
+ }
+ CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName,
+ sourceKey, bucketName, destKey);
+ ossClient.copyObject(copyObjectRequest);
+ ossClient.deleteObject(bucketName, sourceKey);
+ }
+
+ @Override
+ public void moveDir(String bucketName, String sourceDir, String destDir) throws IOException {
+ List<String> objectKeys = listDirObjects(bucketName, sourceDir);
+ for (String key : objectKeys) {
+ moveObject(bucketName, key, destDir + key.substring(sourceDir.length()));
+ }
+ }
+
+
+ @Override
+ public void deleteDir(String bucketName, String dirname) {
+ List<String> keys = listDirObjects(bucketName, dirname);
+ deleteFiles(bucketName, keys);
+ }
+
+ @Override
+ public void deleteFile(String bucketName, String objectKey) throws IOException {
+ deleteFiles(bucketName, Arrays.asList(objectKey));
+ }
+
+ @Override
+ public void deleteFiles(String bucketName, List<String> objectKeys) {
+ if (objectKeys != null && objectKeys.size() > 0) {
+ DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName).withKeys(objectKeys);
+ ossClient.deleteObjects(deleteObjectsRequest);
+ }
+ }
+
+
+ @Override
+ public List<String> listDirObjects(String bucketName, String dirname) {
+ String nextMarker = null;
+ ObjectListing objectListing = null;
+ List<String> keys = new ArrayList<>();
+ do {
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName)
+ .withPrefix(dirname)
+ .withMarker(nextMarker);
+ objectListing = ossClient.listObjects(listObjectsRequest);
+ if (!objectListing.getObjectSummaries().isEmpty()) {
+ for (OSSObjectSummary s : objectListing.getObjectSummaries()) {
+ keys.add(s.getKey());
+ }
+ }
+
+ nextMarker = objectListing.getNextMarker();
+ } while (objectListing.isTruncated());
+ return keys;
+ }
+
+ @Override
+ public void shutdown() {
+ ossClient.shutdown();
+ }
+}
diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/RemoteStorageOperator.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/RemoteStorageOperator.java
new file mode 100644
index 0000000000..e3dfedcd07
--- /dev/null
+++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/RemoteStorageOperator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo.storage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public interface RemoteStorageOperator {
+ void createBucket(String bucketName) throws IOException;
+
+ void deleteBucket(String bucketName) throws IOException;
+
+ boolean doesObjectExist(String bucketName, String key) throws IOException;
+
+ String getTextObject(String bucketName, String key) throws IOException;
+
+ void putTextObject(String bucketName, String key, InputStream inputStream) throws IOException;
+
+ void moveObject(String bucketName, String sourceKey, String destKey) throws IOException;
+
+ void moveDir(String bucketName, String sourceDir, String destDir) throws IOException;
+
+ void deleteDir(String bucketName, String dirname) throws IOException;
+
+ void deleteFile(String bucketName, String objectKey) throws IOException;
+
+ void deleteFiles(String bucketName, List<String> objectKeys) throws IOException;
+
+ List<String> listDirObjects(String bucketName, String dirname);
+
+ void shutdown();
+}
diff --git a/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/MockStorageOperator.java b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/MockStorageOperator.java
new file mode 100644
index 0000000000..3aa8d6d196
--- /dev/null
+++ b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/MockStorageOperator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.zeppelin.notebook.repo.storage.RemoteStorageOperator;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MockStorageOperator implements RemoteStorageOperator {
+
+ private String mockRootFolder;
+
+ public MockStorageOperator() throws IOException {
+ Path tempDirectory = Files.createTempDirectory("zeppelin_mock_storage_dir_");
+ mockRootFolder = tempDirectory.toString() + "/";
+ }
+
+ @Override
+ public void createBucket(String bucketName) throws IOException {
+ FileUtils.forceMkdir(new File(mockRootFolder + bucketName));
+ }
+
+ @Override
+ public void deleteBucket(String bucketName) throws IOException {
+ FileUtils.deleteDirectory(new File(mockRootFolder + bucketName));
+ }
+
+ @Override
+ public boolean doesObjectExist(String bucketName, String key) throws IOException {
+ File file = new File(mockRootFolder + bucketName + "/" + key);
+ return file.exists() && !file.isDirectory();
+ }
+
+ @Override
+ public String getTextObject(String bucketName, String key) throws IOException {
+ if (!doesObjectExist(bucketName, key)) {
+ throw new IOException("Note or its revision not found");
+ }
+ return FileUtils.readFileToString(new File(mockRootFolder + bucketName + "/" + key), "UTF-8");
+ }
+
+ @Override
+ public void putTextObject(String bucketName, String key, InputStream inputStream) throws IOException {
+ File destination = new File(mockRootFolder + bucketName + "/" + key);
+ destination.getParentFile().mkdirs();
+ FileUtils.copyInputStreamToFile(inputStream, destination);
+ }
+
+ @Override
+ public void moveObject(String bucketName, String sourceKey, String destKey) throws IOException {
+ FileUtils.moveFile(new File(mockRootFolder + bucketName + "/" + sourceKey),
+ new File(mockRootFolder + bucketName + "/" + destKey));
+ }
+
+ @Override
+ public void moveDir(String bucketName, String sourceDir, String destDir) throws IOException {
+ List<String> objectKeys = listDirObjects(bucketName, sourceDir);
+ for (String key : objectKeys) {
+ moveObject(bucketName, key, destDir + key.substring(sourceDir.length()));
+ }
+ }
+
+ @Override
+ public void deleteDir(String bucketName, String dirname) throws IOException {
+ List<String> keys = listDirObjects(bucketName, dirname);
+ deleteFiles(bucketName, keys);
+ }
+
+ @Override
+ public void deleteFile(String bucketName, String objectKey) throws IOException {
+ FileUtils.forceDelete(new File(mockRootFolder + bucketName + "/" + objectKey));
+ }
+
+ @Override
+ public void deleteFiles(String bucketName, List<String> objectKeys) throws IOException {
+ if (objectKeys != null && objectKeys.size() > 0) {
+ for (String objectKey : objectKeys) {
+ deleteFile(bucketName, objectKey);
+ }
+ }
+ }
+
+ @Override
+ public List<String> listDirObjects(String bucketName, String dirname) {
+ File directory = new File(mockRootFolder + bucketName + "/" + dirname);
+ if (!directory.isDirectory()) {
+ return new ArrayList<>();
+ }
+ Collection<File> files = FileUtils.listFiles(directory, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
+ return files.stream().map(file -> file.getPath().substring((mockRootFolder + bucketName + "/").length())).collect(Collectors.toList());
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+}
diff --git a/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepoTest.java b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepoTest.java
new file mode 100644
index 0000000000..a84598d0aa
--- /dev/null
+++ b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepoTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.repo.storage.RemoteStorageOperator;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class OSSNotebookRepoTest {
+
+ private AuthenticationInfo anonymous = AuthenticationInfo.ANONYMOUS;
+ private OSSNotebookRepo notebookRepo;
+ private RemoteStorageOperator ossOperator;
+ private String bucket;
+ private static int OSS_VERSION_MAX = 30;
+
+
+
+ @Before
+ public void setUp() throws IOException {
+ bucket = "zeppelin-test-bucket";
+ String endpoint = "yourEndpoint";
+ String accessKeyId = "yourAccessKeyId";
+ String accessKeySecret = "yourAccessKeySecret";
+ ossOperator = new MockStorageOperator();
+ ossOperator.createBucket(bucket);
+ notebookRepo = new OSSNotebookRepo();
+ ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_ENDPOINT.getVarName(),
+ endpoint);
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_BUCKET.getVarName(),
+ bucket);
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYID.getVarName(),
+ accessKeyId);
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYSECRET.getVarName(),
+ accessKeySecret);
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_VERSION_MAX.getVarName(),
+ OSS_VERSION_MAX + "");
+ notebookRepo.init(conf);
+ notebookRepo.setOssOperator(ossOperator);
+ }
+
+ @After
+ public void tearDown() throws InterruptedException, IOException {
+ ossOperator.deleteDir(bucket, "");
+ ossOperator.deleteBucket(bucket);
+ // The delete operations on OSS Service above has a delay.
+ // And it would affect setup of next test case if we do not wait for them to end.
+ Thread.sleep(1000);
+
+ // notebookRepo.close() would call ossOperator.shutdown()
+ if (notebookRepo != null) {
+ notebookRepo.close();
+ }
+ }
+
+ @Test
+ public void testNotebookRepo() throws IOException {
+ Map<String, NoteInfo> notesInfo = notebookRepo.list(anonymous);
+ assertEquals(0, notesInfo.size());
+
+ // create Note note1
+ Note note1 = new Note();
+ note1.setPath("/spark/note_1");
+ notebookRepo.save(note1, anonymous);
+
+ //
+ for (int i = 1; i <= OSS_VERSION_MAX + 3; i++) {
+ Paragraph p = new Paragraph(note1, null);
+ p.setText("text" + i);
+ p.setStatus(Job.Status.RUNNING);
+ p.setAuthenticationInfo(new AuthenticationInfo("anonymous", (String) null, "anonymous"));
+ note1.addParagraph(p);
+ notebookRepo.save(note1, anonymous);
+ notebookRepo.checkpoint(note1.getId(), note1.getPath(), "commit " + i, anonymous);
+ }
+
+ notesInfo = notebookRepo.list(anonymous);
+ assertEquals(1, notesInfo.size());
+ assertEquals("/spark/note_1", notesInfo.get(note1.getId()).getPath());
+
+ // Get note1
+ Note noteFromRepo = notebookRepo.get(note1.getId(), note1.getPath(), anonymous);
+ assertEquals(note1.getName(), noteFromRepo.getName());
+
+ // Get non-existed note
+ try {
+ notebookRepo.get("invalid_id", "/invalid_path", anonymous);
+ fail("Should fail to get non-existed note1");
+ } catch (IOException e) {
+ assertEquals(e.getMessage(), "Note or its revision not found");
+ }
+
+ // create another Note note2
+ Note note2 = new Note();
+ note2.setPath("/spark/note_2");
+ notebookRepo.save(note2, anonymous);
+
+ notesInfo = notebookRepo.list(anonymous);
+ assertEquals(2, notesInfo.size());
+ assertEquals("/spark/note_1", notesInfo.get(note1.getId()).getPath());
+ assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath());
+
+ // move note1
+ notebookRepo.move(note1.getId(), note1.getPath(), "/spark2/note_1", anonymous);
+
+ notesInfo = notebookRepo.list(anonymous);
+ assertEquals(2, notesInfo.size());
+ assertEquals("/spark2/note_1", notesInfo.get(note1.getId()).getPath());
+ assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath());
+
+ // move folder
+ notebookRepo.move("/spark2", "/spark3", anonymous);
+
+ notesInfo = notebookRepo.list(anonymous);
+ assertEquals(2, notesInfo.size());
+ assertEquals("/spark3/note_1", notesInfo.get(note1.getId()).getPath());
+ assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath());
+
+ // delete note
+ notebookRepo.remove(note1.getId(), notesInfo.get(note1.getId()).getPath(), anonymous);
+ notesInfo = notebookRepo.list(anonymous);
+ assertEquals(1, notesInfo.size());
+ assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath());
+
+ // delete folder
+ notebookRepo.remove("/spark", anonymous);
+ notesInfo = notebookRepo.list(anonymous);
+ assertEquals(0, notesInfo.size());
+ }
+
+
+ @Test
+ public void testNotebookRepoWithVersionControl() throws IOException {
+ Map<String, NoteInfo> notesInfo = notebookRepo.list(anonymous);
+ assertEquals(0, notesInfo.size());
+
+ // create Note note1
+ Note note1 = new Note();
+ note1.setPath("/version_control/note_1");
+
+ List<NotebookRepoWithVersionControl.Revision> revisionList = new ArrayList<>();
+
+ for (int i = 1; i <= OSS_VERSION_MAX + 3; i++) {
+ Paragraph p = new Paragraph(note1, null);
+ p.setText("text" + i);
+ p.setStatus(Job.Status.RUNNING);
+ p.setAuthenticationInfo(new AuthenticationInfo("anonymous", (String) null, "anonymous"));
+ note1.addParagraph(p);
+ notebookRepo.save(note1, anonymous);
+
+ // checkpoint
+ NotebookRepoWithVersionControl.Revision revision = notebookRepo.checkpoint(note1.getId(), note1.getPath(), "commit " + i, anonymous);
+ revisionList.add(revision);
+
+ List<NotebookRepoWithVersionControl.Revision> revisionsHistory = notebookRepo.revisionHistory(note1.getId(), note1.getPath(), anonymous);
+ // verify OSS_VERSION_MAX control
+ if (i <= OSS_VERSION_MAX) {
+ assertEquals(i, revisionsHistory.size());
+ } else {
+ assertEquals(OSS_VERSION_MAX, revisionsHistory.size());
+ }
+ }
+
+ // get note by non-existed revisionId
+ for (int i = 1; i <= 3; i++) {
+ try {
+ notebookRepo.get(note1.getId(), note1.getPath(), revisionList.get(i - 1).id, anonymous);
+ fail("Should fail to get non-existed note1");
+ } catch (IOException e) {
+ assertEquals(e.getMessage(), "Note or its revision not found");
+ }
+ }
+
+ // get note by existed revisionId
+ for (int i = 4; i <= OSS_VERSION_MAX + 3; i++) {
+ Note note = notebookRepo.get(note1.getId(), note1.getPath(), revisionList.get(i - 1).id, anonymous);
+ assertEquals(i, note.getParagraphs().size());
+ }
+
+ // revisionsHistory
+ List<NotebookRepoWithVersionControl.Revision> revisionsHistory = notebookRepo.revisionHistory(note1.getId(), note1.getPath(), anonymous);
+ for (int i = 0; i < revisionsHistory.size(); i++) {
+ assertEquals(revisionsHistory.get(i).id, revisionList.get(revisionList.size() - i - 1).id);
+ assertEquals(revisionsHistory.get(i).message, revisionList.get(revisionList.size() - i - 1).message);
+ assertEquals(revisionsHistory.get(i).time, revisionList.get(revisionList.size() - i - 1).time);
+ }
+
+
+ // Modify note to distinguish itself with last version
+ Paragraph p = new Paragraph(note1, null);
+ p.setText("text" + OSS_VERSION_MAX + 4);
+ p.setStatus(Job.Status.RUNNING);
+ p.setAuthenticationInfo(new AuthenticationInfo("anonymous", (String) null, "anonymous"));
+ note1.addParagraph(p);
+ notebookRepo.save(note1, anonymous);
+
+ assertEquals(notebookRepo.get(note1.getId(), note1.getPath(), anonymous).getParagraphs().size(), OSS_VERSION_MAX + 4);
+
+ // Assume OSS_VERSION_MAX = 30
+ // revert note to revision 31 , then to revision 32, then to revision 33, finally to revision 31
+ for (int i = OSS_VERSION_MAX + 1; i <= OSS_VERSION_MAX + 3; i++) {
+ notebookRepo.setNoteRevision(note1.getId(), note1.getPath(), revisionList.get(i - 1).id, anonymous);
+ assertEquals(notebookRepo.get(note1.getId(), note1.getPath(), anonymous).getParagraphs().size(), i);
+ }
+
+ // finally revert note to revision 31
+ notebookRepo.setNoteRevision(note1.getId(), note1.getPath(), revisionList.get(OSS_VERSION_MAX).id, anonymous);
+ assertEquals(notebookRepo.get(note1.getId(), note1.getPath(), anonymous).getParagraphs().size(), OSS_VERSION_MAX + 1);
+
+ notebookRepo.remove("/version_control", anonymous);
+ }
+}