You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/02/21 00:29:16 UTC
zeppelin git commit: [ZEPPELIN-1859] Add MongoNotebookRepo
Repository: zeppelin
Updated Branches:
refs/heads/master 4fd85f773 -> 5e75145ac
[ZEPPELIN-1859] Add MongoNotebookRepo
### What is this PR for?
This PR adds Mongo notebook storage.
The reason that I made this feature is for HA(High Availability).
S3 and Git storage are the only available method for HA as far as I know.
I'm managing Ambari cluster in my lab, but Zeppelin is the most vulnerable part of it.
Because one server contains all Zeppelin notes.
Therefore, by deploying MongoDB's [replica set](https://docs.mongodb.com/manual/replication/) and using it as Zeppelin notebook storage, I would like to achieve HA.
#### The way to use Mongo DB as notebook storage
```sh
export ZEPPELIN_NOTEBOOK_STORAGE=org.apache.zeppelin.notebook.repo.MongoNotebookRepo
```
or at `zeppelin-site.xml`:
```xml
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.MongoNotebookRepo</value>
<description>notebook persistence layer implementation</description>
</property>
```
#### Configurable environment variables
* `ZEPPELIN_NOTEBOOK_MONGO_URI` MongoDB connection URI
* `ZEPPELIN_NOTEBOOK_MONGO_DATABASE` Database name
* `ZEPPELIN_NOTEBOOK_MONGO_COLLECTION` Collection name
* `ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT` If `true`, automatically import your local notes. Default `false`
They can be configured at `zeppelin-site.xml` as well:
* `zeppelin.notebook.mongo.uri`
* `zeppelin.notebook.mongo.database`
* `zeppelin.notebook.mongo.collection`
* `zeppelin.notebook.mongo.autoimport`
#### Future work
If we use Mongo DB's [oplog tailing](https://docs.mongodb.com/manual/core/replica-set-oplog/), maybe multi-server architecture is possible.
### What type of PR is it?
[Feature]
### Todos
* [ ] - Write a documentation for Mongo storage
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1859
### How should this be tested?
#### Install MongoDB (if you don't have)
```sh
brew update
brew install mongodb
```
#### Build Zepppelin
```sh
mvn clean package -DskipTests
```
#### Run Zeppelin wih Mongo storage
```sh
export ZEPPELIN_NOTEBOOK_STORAGE=org.apache.zeppelin.notebook.repo.MongoNotebookRepo
export ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT=true
bin/zeppelin-daemon.sh restart
```
The default database and collection names are `zeppelin`, `notes` respectively.
And `ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT` option will automatically import your `local notes` that don't exist in MongoDB.
#### Check whether a document in MongoDB updated
Create, update, remove a note and open mongo shell:
```sh
mongo zeppelin
```
And check state of the note is the same as you think:
```sh
db.notes.findOne({_id: '<NOTE_ID_THAT_YOU_WANT_TO_SEE>'})
```
#### Confirm that configurations works
```sh
export ZEPPELIN_NOTEBOOK_STORAGE=org.apache.zeppelin.notebook.repo.MongoNotebookRepo
export ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT=true
export ZEPPELIN_NOTEBOOK_MONGO_DATABASE=otherdb
export ZEPPELIN_NOTEBOOK_MONGO_COLLECTION=mynotes
export ZEPPELIN_NOTEBOOK_MONGO_URI=mongodb://localhost:27017
bin/zeppelin-daemon.sh restart
```
The collection `mynotes` should be created in db `otherdb`.
Let's check it!
```sh
mongo otherdb
db.mynotes.count()
```
The result should not be zero.
#### Confirm that configurations from `zeppelin-site.xml` works
Open your `conf/zeppelin-site.xml` file (copy from `zeppelin-site.xml.template` if you don't have one), and comment lines below:
```xml
<!--
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.VFSNotebookRepo</value>
<description>notebook persistence layer implementation</description>
</property>
-->
```
And add lines below:
```xml
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.MongoNotebookRepo</value>
<description>notebook persistence layer implementation</description>
</property>
<property>
<name>zeppelin.notebook.mongo.uri</name>
<value>mongodb://localhost</value>
<description>MongoDB connection URI used to connect to a MongoDB database server</description>
</property>
<property>
<name>zeppelin.notebook.mongo.database</name>
<value>zepl</value>
<description>database name for notebook storage</description>
</property>
<property>
<name>zeppelin.notebook.mongo.collection</name>
<value>notes</value>
<description>collection name for notebook storage</description>
</property>
<property>
<name>zeppelin.notebook.mongo.autoimport</name>
<value>false</value>
<description>import local notes into MongoDB automatically on startup</description>
</property>
```
This time we will import a note via `mongoimport`. I made it possible to import a note from JSON just in case.
```sh
cd $ZEPPELIN_HOME/notebook/<NOTE_ID_YOU_WANT_TO_IMPORT>
mongoimport --db zepl --collection notes --file note.json
```
Ensure that your environment variables are clean(just reopen your terminal if you are not), and restart zeppelin:
```sh
bin/zeppelin-daemon.sh restart
```
Open browser and go to `localhost:8080`. The note that you imported should be shown.
### Questions:
* Does the licenses files need update? Maybe...? I used [java-mongodb-driver](https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver/3.4.1) which has *The Apache Software License, Version 2.0*
* Is there breaking changes for older versions? NO
* Does this needs documentation? YES
Author: Jun Kim <i2...@gmail.com>
Closes #1826 from tae-jun/ZEPPELIN-1859 and squashes the following commits:
98282ae [Jun Kim] Add a documentation for MongoDB notebook storage
77947b8 [Jun Kim] Add license information of mongo-java-driver
08eee3d [Jun Kim] fix style check error
a4fba8c [Jun Kim] Add MongoNotebookRepo
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5e75145a
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5e75145a
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5e75145a
Branch: refs/heads/master
Commit: 5e75145ac84bf6fb844b90f3a466bca89a6fde9a
Parents: 4fd85f7
Author: Jun Kim <i2...@gmail.com>
Authored: Sun Feb 12 20:21:59 2017 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Tue Feb 21 09:29:09 2017 +0900
----------------------------------------------------------------------
conf/zeppelin-env.sh.template | 4 +
conf/zeppelin-site.xml.template | 33 +++
docs/_includes/themes/zeppelin/_navigation.html | 4 +-
docs/storage/storage.md | 72 ++++-
zeppelin-distribution/src/bin_license/LICENSE | 1 +
zeppelin-zengine/pom.xml | 9 +-
.../zeppelin/conf/ZeppelinConfiguration.java | 20 ++
.../notebook/repo/MongoNotebookRepo.java | 268 +++++++++++++++++++
8 files changed, 407 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e75145a/conf/zeppelin-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template
index 46fd481..3dccca6 100644
--- a/conf/zeppelin-env.sh.template
+++ b/conf/zeppelin-env.sh.template
@@ -36,6 +36,10 @@
# export ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID # AWS KMS key ID
# export ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION # AWS KMS key region
# export ZEPPELIN_NOTEBOOK_S3_SSE # Server-side encryption enabled for notebooks
+# export ZEPPELIN_NOTEBOOK_MONGO_URI # MongoDB connection URI used to connect to a MongoDB database server. Default "mongodb://localhost"
+# export ZEPPELIN_NOTEBOOK_MONGO_DATABASE # Database name to store notebook. Default "zeppelin"
+# export ZEPPELIN_NOTEBOOK_MONGO_COLLECTION # Collection name to store notebook. Default "notes"
+# export ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT # If "true" import local notes under ZEPPELIN_NOTEBOOK_DIR on startup. Default "false"
# export ZEPPELIN_IDENT_STRING # A string representing this instance of zeppelin. $USER by default.
# export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0.
# export ZEPPELIN_INTERPRETER_LOCALREPO # Local repository for interpreter's additional dependency loading
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e75145a/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index abaff30..fa47e1b 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -182,6 +182,39 @@
</property>
-->
+<!-- MongoDB notebook storage -->
+<!--
+<property>
+ <name>zeppelin.notebook.storage</name>
+ <value>org.apache.zeppelin.notebook.repo.MongoNotebookRepo</value>
+ <description>notebook persistence layer implementation</description>
+</property>
+
+<property>
+ <name>zeppelin.notebook.mongo.uri</name>
+ <value>mongodb://localhost</value>
+ <description>MongoDB connection URI used to connect to a MongoDB database server</description>
+</property>
+
+<property>
+ <name>zeppelin.notebook.mongo.database</name>
+ <value>zeppelin</value>
+ <description>database name for notebook storage</description>
+</property>
+
+<property>
+ <name>zeppelin.notebook.mongo.collection</name>
+ <value>notes</value>
+ <description>collection name for notebook storage</description>
+</property>
+
+<property>
+ <name>zeppelin.notebook.mongo.autoimport</name>
+ <value>false</value>
+ <description>import local notes into MongoDB automatically on startup</description>
+</property>
+-->
+
<property>
<name>zeppelin.notebook.storage</name>
<value>org.apache.zeppelin.notebook.repo.GitNotebookRepo</value>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e75145a/docs/_includes/themes/zeppelin/_navigation.html
----------------------------------------------------------------------
diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html
index b53e9e9..ec7abfa 100644
--- a/docs/_includes/themes/zeppelin/_navigation.html
+++ b/docs/_includes/themes/zeppelin/_navigation.html
@@ -101,6 +101,7 @@
<li><a href="{{BASE_PATH}}/storage/storage.html#notebook-storage-in-s3">S3 Storage</a></li>
<li><a href="{{BASE_PATH}}/storage/storage.html#notebook-storage-in-azure">Azure Storage</a></li>
<li><a href="{{BASE_PATH}}/storage/storage.html#storage-in-zeppelinhub">ZeppelinHub Storage</a></li>
+ <li><a href="{{BASE_PATH}}/storage/storage.html#notebook-storage-in-mongodb">MongoDB Storage</a></li>
<li role="separator" class="divider"></li>
<li class="title"><span><b>REST API</b><span></li>
<li><a href="{{BASE_PATH}}/rest-api/rest-interpreter.html">Interpreter API</a></li>
@@ -110,7 +111,7 @@
<li><a href="{{BASE_PATH}}/rest-api/rest-helium.html">Helium API</a></li>
<li role="separator" class="divider"></li>
<li class="title"><span><b>Security</b><span></li>
- <li><a href="{{BASE_PATH}}/security/shiroauthentication.html">Shiro Authentication</a></li>
+ <li><a href="{{BASE_PATH}}/security/shiroauthentication.html">Shiro Authentication</a></li>
<li><a href="{{BASE_PATH}}/security/notebook_authorization.html">Notebook Authorization</a></li>
<li><a href="{{BASE_PATH}}/security/datasource_authorization.html">Data Source Authorization</a></li>
<li role="separator" class="divider"></li>
@@ -139,4 +140,3 @@
</nav><!--/.navbar-collapse -->
</div>
</div>
-
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e75145a/docs/storage/storage.md
----------------------------------------------------------------------
diff --git a/docs/storage/storage.md b/docs/storage/storage.md
index 73388da..9deca3d 100644
--- a/docs/storage/storage.md
+++ b/docs/storage/storage.md
@@ -32,6 +32,7 @@ There are few notebook storage systems available for a use out of the box:
* all notes are saved in the notebook folder in your local File System - `VFSNotebookRepo`
* storage using Amazon S3 service - `S3NotebookRepo`
* storage using Azure service - `AzureNotebookRepo`
+ * storage using MongoDB - `MongoNotebookRepo`
Multiple storage systems can be used at the same time by providing a comma-separated list of the class-names in the configuration.
By default, only first two of them will be automatically kept in sync by Zeppelin.
@@ -184,7 +185,7 @@ Or using the following setting in **zeppelin-site.xml**:
```
</br>
-## Notebook Storage in Azure <a name="Azure"></a>
+## Notebook Storage in Azure <a name="Azure"></a>
Using `AzureNotebookRepo` you can connect your Zeppelin with your Azure account for notebook storage.
@@ -274,3 +275,72 @@ export ZEPPELINHUB_API_ADDRESS = address of ZeppelinHub service (e.g. https://ww
```
You can get more information on generating `token` and using authentication on the corresponding [help page](http://help.zeppelinhub.com/zeppelin_integration/#add-a-new-zeppelin-instance-and-generate-a-token).
+
+
+## Notebook Storage in MongoDB <a name="MongoDB"></a>
+Using `MongoNotebookRepo`, you can store your notebook in [MongoDB](https://www.mongodb.com/).
+
+### Why MongoDB?
+* **[High Availability (HA)](https://en.wikipedia.org/wiki/High_availability)** by a [replica set](https://docs.mongodb.com/manual/reference/glossary/#term-replica-set)
+* Seperation of storage from server
+
+### How to use
+You can use MongoDB as notebook storage by editting `zeppelin-env.sh` or `zeppelin-site.xml`.
+
+#### (Method 1) by editting `zeppelin-env.sh`
+Add a line below to `$ZEPPELIN_HOME/conf/zeppelin-env.sh`:
+
+```sh
+export ZEPPELIN_NOTEBOOK_STORAGE=org.apache.zeppelin.notebook.repo.MongoNotebookRepo
+```
+
+> *NOTE:* The default MongoDB connection URI is `mongodb://localhost`
+
+#### (Method 2) by editting `zeppelin-site.xml`
+Or, **uncomment** lines below at `$ZEPPELIN_HOME/conf/zeppelin-site.xml`:
+
+```xml
+<property>
+ <name>zeppelin.notebook.storage</name>
+ <value>org.apache.zeppelin.notebook.repo.MongoNotebookRepo</value>
+ <description>notebook persistence layer implementation</description>
+</property>
+```
+
+And **comment** lines below:
+
+```xml
+<property>
+ <name>zeppelin.notebook.storage</name>
+ <value>org.apache.zeppelin.notebook.repo.GitNotebookRepo</value>
+ <description>versioned notebook persistence layer implementation</description>
+</property>
+```
+
+### Configurable Options
+
+You can configure options below in `zeppelin-env.sh`.
+
+* `ZEPPELIN_NOTEBOOK_MONGO_URI` [MongoDB connection URI](https://docs.mongodb.com/manual/reference/connection-string/) used to connect to a MongoDB database server
+* `ZEPPELIN_NOTEBOOK_MONGO_DATABASE` Database name
+* `ZEPPELIN_NOTEBOOK_MONGO_COLLECTION` Collection name
+* `ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT` If `true`, import local notes (refer to description below for details)
+
+Or, you can configure them in `zeppelin-site.xml`. Corresponding option names as follows:
+
+* `zeppelin.notebook.mongo.uri`
+* `zeppelin.notebook.mongo.database`
+* `zeppelin.notebook.mongo.collection`
+* `zeppelin.notebook.mongo.autoimport`
+
+#### Example configurations in `zeppelin-env.sh`
+
+```sh
+export ZEPPELIN_NOTEBOOK_MONGO_URI=mongodb://db1.example.com:27017
+export ZEPPELIN_NOTEBOOK_MONGO_DATABASE=myfancy
+export ZEPPELIN_NOTEBOOK_MONGO_COLLECTION=notebook
+export ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT=true
+```
+
+#### Import your local notes automatically
+By setting `ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT` as `true` (default `false`), you can import your local notes automatically when Zeppelin daemon starts up. This feature is for easy migration from local file system storage to MongoDB storage. A note with ID already existing in the collection will not be imported.
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e75145a/zeppelin-distribution/src/bin_license/LICENSE
----------------------------------------------------------------------
diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE
index d27f27d..9f0c74b 100644
--- a/zeppelin-distribution/src/bin_license/LICENSE
+++ b/zeppelin-distribution/src/bin_license/LICENSE
@@ -216,6 +216,7 @@ The following components are provided under Apache License.
(Apache 2.0) Scalatest 2.2.4 (org.scalatest:scalatest_2.10:2.2.4 - https://github.com/scalatest/scalatest)
(Apache 2.0) frontend-maven-plugin 1.3 (com.github.eirslett:frontend-maven-plugin:1.3 - https://github.com/eirslett/frontend-maven-plugin/blob/frontend-plugins-1.3/LICENSE
(Apache 2.0) frontend-plugin-core 1.3 (com.github.eirslett:frontend-plugin-core) - https://github.com/eirslett/frontend-maven-plugin/blob/frontend-plugins-1.3/LICENSE
+ (Apache 2.0) mongo-java-driver 3.4.1 (org.mongodb:mongo-java-driver:3.4.1) - https://github.com/mongodb/mongo-java-driver/blob/master/LICENSE.txt
========================================================================
MIT licenses
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e75145a/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index ff70fed..d5fef16 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -144,7 +144,7 @@
<artifactId>jetty-client</artifactId>
<version>${jetty.version}</version>
</dependency>
-
+
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
@@ -284,6 +284,13 @@
<artifactId>commons-lang3</artifactId>
<version>${commons.lang3.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <version>3.4.1</version>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e75145a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 0708719..81507ba 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -384,6 +384,22 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_S3_SSE);
}
+ public String getMongoUri() {
+ return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_URI);
+ }
+
+ public String getMongoDatabase() {
+ return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_DATABASE);
+ }
+
+ public String getMongoCollection() {
+ return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_COLLECTION);
+ }
+
+ public boolean getMongoAutoimport() {
+ return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT);
+ }
+
public String getInterpreterListPath() {
return getRelativeDir(String.format("%s/interpreter-list", getConfDir()));
}
@@ -595,6 +611,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING("zeppelin.notebook.azure.connectionString", null),
ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"),
+ ZEPPELIN_NOTEBOOK_MONGO_DATABASE("zeppelin.notebook.mongo.database", "zeppelin"),
+ ZEPPELIN_NOTEBOOK_MONGO_COLLECTION("zeppelin.notebook.mongo.collection", "notes"),
+ ZEPPELIN_NOTEBOOK_MONGO_URI("zeppelin.notebook.mongo.uri", "mongodb://localhost"),
+ ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT("zeppelin.notebook.mongo.autoimport", false),
ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", GitNotebookRepo.class.getName()),
ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false),
// whether by default note is public or private
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e75145a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java
new file mode 100644
index 0000000..9502cf3
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java
@@ -0,0 +1,268 @@
+package org.apache.zeppelin.notebook.repo;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.mongodb.MongoBulkWriteException;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.bulk.BulkWriteError;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+import static com.mongodb.client.model.Filters.eq;
+import static com.mongodb.client.model.Filters.type;
+import static com.mongodb.client.model.Filters.in;
+
+import com.mongodb.client.model.InsertManyOptions;
+import com.mongodb.client.model.UpdateOptions;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.notebook.NotebookImportDeserializer;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.ApplicationState;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.bson.BsonType;
+import org.bson.Document;
+import org.bson.types.ObjectId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Backend for storing Notebook on MongoDB
+ */
+public class MongoNotebookRepo implements NotebookRepo {
+ private static final Logger LOG = LoggerFactory.getLogger(MongoNotebookRepo.class);
+
+ private final ZeppelinConfiguration conf;
+ private final MongoClient mongo;
+ private final MongoDatabase db;
+ private final MongoCollection<Document> coll;
+
+ public MongoNotebookRepo(ZeppelinConfiguration conf) throws IOException {
+ this.conf = conf;
+
+ mongo = new MongoClient(new MongoClientURI(conf.getMongoUri()));
+ db = mongo.getDatabase(conf.getMongoDatabase());
+ coll = db.getCollection(conf.getMongoCollection());
+
+ if (conf.getMongoAutoimport()) {
+ // import local notes into MongoDB
+ insertFileSystemNotes();
+ }
+ }
+
+ /**
+ * If environment variable ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT is true,
+ * this method will insert local notes into MongoDB on startup.
+ * If a note already exists in MongoDB, skip it.
+ */
+ private void insertFileSystemNotes() throws IOException {
+ LinkedList<Document> docs = new LinkedList<>(); // docs to be imported
+ NotebookRepo vfsRepo = new VFSNotebookRepo(this.conf);
+ List<NoteInfo> infos = vfsRepo.list(null);
+ // collect notes to be imported
+ for (NoteInfo info : infos) {
+ Note note = vfsRepo.get(info.getId(), null);
+ Document doc = noteToDocument(note);
+ docs.add(doc);
+ }
+
+ /*
+ * 'ordered(false)' option allows to proceed bulk inserting even though
+ * there are duplicated documents. The duplicated documents will be skipped
+ * and print a WARN log.
+ */
+ try {
+ coll.insertMany(docs, new InsertManyOptions().ordered(false));
+ } catch (MongoBulkWriteException e) {
+ printDuplicatedException(e); //print duplicated document warning log
+ }
+
+ vfsRepo.close(); // it does nothing for now but maybe in the future...
+ }
+
+ /**
+ * MongoBulkWriteException contains error messages that inform
+ * which documents were duplicated. This method catches those ID and print them.
+ * @param e
+ */
+ private void printDuplicatedException(MongoBulkWriteException e) {
+ List<BulkWriteError> errors = e.getWriteErrors();
+ for (BulkWriteError error : errors) {
+ String msg = error.getMessage();
+ Pattern pattern = Pattern.compile("[A-Z0-9]{9}"); // regex for note ID
+ Matcher matcher = pattern.matcher(msg);
+ if (matcher.find()) { // if there were a note ID
+ String noteId = matcher.group();
+ LOG.warn("Note " + noteId + " not inserted since already exists in MongoDB");
+ }
+ }
+ }
+
+ @Override
+ public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
+ syncId();
+
+ List<NoteInfo> infos = new LinkedList<>();
+ MongoCursor<Document> cursor = coll.find().iterator();
+
+ while (cursor.hasNext()) {
+ Document doc = cursor.next();
+ Note note = documentToNote(doc);
+ NoteInfo info = new NoteInfo(note);
+ infos.add(info);
+ }
+
+ cursor.close();
+
+ return infos;
+ }
+
+ /**
+ * Find documents of which type of _id is object ID, and change it to note ID.
+ * Since updating _id field is not allowed, remove original documents and insert
+ * new ones with string _id(note ID)
+ */
+ private void syncId() {
+ // find documents whose id type is object id
+ MongoCursor<Document> cursor = coll.find(type("_id", BsonType.OBJECT_ID)).iterator();
+ // if there is no such document, exit
+ if (!cursor.hasNext())
+ return;
+
+ List<ObjectId> oldDocIds = new LinkedList<>(); // document ids need to update
+ List<Document> updatedDocs = new LinkedList<>(); // new documents to be inserted
+
+ while (cursor.hasNext()) {
+ Document doc = cursor.next();
+ // store original _id
+ ObjectId oldId = doc.getObjectId("_id");
+ oldDocIds.add(oldId);
+ // store the document with string _id (note id)
+ String noteId = doc.getString("id");
+ doc.put("_id", noteId);
+ updatedDocs.add(doc);
+ }
+
+ coll.insertMany(updatedDocs);
+ coll.deleteMany(in("_id", oldDocIds));
+
+ cursor.close();
+ }
+
+ /**
+ * Convert document to note
+ */
+ private Note documentToNote(Document doc) {
+ // document to JSON
+ String json = doc.toJson();
+ // JSON to note
+ Gson gson = new GsonBuilder()
+ .registerTypeAdapter(Date.class, new NotebookImportDeserializer())
+ .create();
+ Note note = gson.fromJson(json, Note.class);
+
+ for (Paragraph p : note.getParagraphs()) {
+ if (p.getStatus() == Job.Status.PENDING || p.getStatus() == Job.Status.RUNNING) {
+ p.setStatus(Job.Status.ABORT);
+ }
+
+ List<ApplicationState> appStates = p.getAllApplicationStates();
+ if (appStates != null) {
+ for (ApplicationState app : appStates) {
+ if (app.getStatus() != ApplicationState.Status.ERROR) {
+ app.setStatus(ApplicationState.Status.UNLOADED);
+ }
+ }
+ }
+ }
+
+ return note;
+ }
+
+ /**
+ * Convert note to document
+ */
+ private Document noteToDocument(Note note) {
+ // note to JSON
+ Gson gson = new GsonBuilder().create();
+ String json = gson.toJson(note);
+ // JSON to document
+ Document doc = Document.parse(json);
+ // set object id as note id
+ doc.put("_id", note.getId());
+ return doc;
+ }
+
+ @Override
+ public Note get(String noteId, AuthenticationInfo subject) throws IOException {
+ Document doc = coll.find(eq("_id", noteId)).first();
+
+ if (doc == null) {
+ throw new IOException("Note " + noteId + "not found");
+ }
+
+ return documentToNote(doc);
+ }
+
+ @Override
+ public void save(Note note, AuthenticationInfo subject) throws IOException {
+ Document doc = noteToDocument(note);
+ coll.replaceOne(eq("_id", note.getId()), doc, new UpdateOptions().upsert(true));
+ }
+
+ @Override
+ public void remove(String noteId, AuthenticationInfo subject) throws IOException {
+ coll.deleteOne(eq("_id", noteId));
+ }
+
+ @Override
+ public void close() {
+ mongo.close();
+ }
+
+ @Override
+ public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject)
+ throws IOException {
+ // no-op
+ LOG.warn("Checkpoint feature isn't supported in {}", this.getClass().toString());
+ return Revision.EMPTY;
+ }
+
+ @Override
+ public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException {
+ LOG.warn("Get note revision feature isn't supported in {}", this.getClass().toString());
+ return null;
+ }
+
+ @Override
+ public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject) {
+ LOG.warn("Get Note revisions feature isn't supported in {}", this.getClass().toString());
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Note setNoteRevision(String noteId, String revId, AuthenticationInfo subject)
+ throws IOException {
+ // Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject) {
+ LOG.warn("Method not implemented");
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void updateSettings(Map<String, String> settings, AuthenticationInfo subject) {
+ LOG.warn("Method not implemented");
+ }
+}