You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/04/28 00:02:23 UTC
incubator-zeppelin git commit: ZEPPELIN-26 Pluggable notebook
persistence layer
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master 87f28ab4b -> 4fa3db81d
ZEPPELIN-26 Pluggable notebook persistence layer
See https://issues.apache.org/jira/browse/ZEPPELIN-26
* [x] Notebook persistence layer abstraction
* [x] Make persistence layer implementation selection configurable
This PR abstract notebook persistence layer and provides one implementation based on commons-vfs.
Author: Lee moon soo <mo...@apache.org>
Closes #44 from Leemoonsoo/ZEPPELIN-26 and squashes the following commits:
c847d1c [Lee moon soo] Restore reference of note from paragraph
48fdd8e [Lee moon soo] Remove URI param from NotebookRepo constructor
44d6a6f [Lee moon soo] ZEPPELIN-26 let implementation configurable
1d63234 [Lee moon soo] Add notebook storage abstraction and implementation based on commons-vfs2
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/4fa3db81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/4fa3db81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/4fa3db81
Branch: refs/heads/master
Commit: 4fa3db81d393c1ae2fff555e651df327ec76f97a
Parents: 87f28ab
Author: Lee moon soo <mo...@apache.org>
Authored: Mon Apr 27 11:32:52 2015 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Tue Apr 28 07:02:15 2015 +0900
----------------------------------------------------------------------
conf/zeppelin-site.xml.template | 8 +-
.../apache/zeppelin/server/ZeppelinServer.java | 11 +-
zeppelin-zengine/pom.xml | 24 ++
.../zeppelin/conf/ZeppelinConfiguration.java | 4 +-
.../java/org/apache/zeppelin/notebook/Note.java | 90 ++-----
.../org/apache/zeppelin/notebook/NoteInfo.java | 67 ++++++
.../org/apache/zeppelin/notebook/Notebook.java | 131 +++++-----
.../zeppelin/notebook/repo/NotebookRepo.java | 34 +++
.../zeppelin/notebook/repo/VFSNotebookRepo.java | 241 +++++++++++++++++++
.../apache/zeppelin/notebook/NotebookTest.java | 8 +-
10 files changed, 479 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index fae1104..9f773d5 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -49,7 +49,13 @@
<property>
<name>zeppelin.notebook.dir</name>
<value>notebook</value>
- <description>notebook persist</description>
+ <description>path or URI for notebook persist</description>
+</property>
+
+<property>
+ <name>zeppelin.notebook.storage</name>
+ <value>org.apache.zeppelin.notebook.repo.VFSNotebookRepo</value>
+ <description>notebook persistence layer implementation</description>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index bd55b2d..0072b87 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.server;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
@@ -32,6 +33,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.rest.InterpreterRestApi;
import org.apache.zeppelin.rest.NotebookRestApi;
import org.apache.zeppelin.rest.ZeppelinRestApi;
@@ -71,6 +73,8 @@ public class ZeppelinServer extends Application {
private InterpreterFactory replFactory;
+ private NotebookRepo notebookRepo;
+
public static void main(String[] args) throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
conf.setProperty("args", args);
@@ -299,7 +303,12 @@ public class ZeppelinServer extends Application {
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookServer);
- notebook = new Notebook(conf, schedulerFactory, replFactory, notebookServer);
+ Class<?> notebookStorageClass = getClass().forName(
+ conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE));
+ Constructor<?> constructor = notebookStorageClass.getConstructor(
+ ZeppelinConfiguration.class);
+ this.notebookRepo = (NotebookRepo) constructor.newInstance(conf);
+ notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookServer);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index b90847b..6a2c00a 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -77,6 +77,30 @@
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-vfs2</artifactId>
+ <version>2.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-webdav</artifactId>
+ <version>1.5.2</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ <version>3.1</version>
+ </dependency>
+
+ <dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 580860a..bbf46fc 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.configuration.tree.ConfigurationNode;
+import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -328,7 +329,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
}
public String getNotebookDir() {
- return getRelativeDir(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
+ return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
}
public String getInterpreterDir() {
@@ -392,6 +393,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
+ ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()),
ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", "bin/interpreter.sh"),
// Decide when new note is created, interpreter settings will be binded automatically or not.
ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true);
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index b5e68a4..46b4c1a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -17,9 +17,6 @@
package org.apache.zeppelin.notebook;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
@@ -28,27 +25,21 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.utility.IdHashes;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
-import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
/**
* Binded interpreters for a note
*/
@@ -63,6 +54,7 @@ public class Note implements Serializable, JobListener {
private transient NoteInterpreterLoader replLoader;
private transient ZeppelinConfiguration conf;
private transient JobListenerFactory jobListenerFactory;
+ private transient NotebookRepo repo;
/**
* note configurations.
@@ -78,11 +70,13 @@ public class Note implements Serializable, JobListener {
*/
private Map<String, Object> info = new HashMap<String, Object>();
+
public Note() {}
- public Note(ZeppelinConfiguration conf, NoteInterpreterLoader replLoader,
- JobListenerFactory jobListenerFactory, org.quartz.Scheduler quartzSched) {
- this.conf = conf;
+ public Note(NotebookRepo repo,
+ NoteInterpreterLoader replLoader,
+ JobListenerFactory jobListenerFactory) {
+ this.repo = repo;
this.replLoader = replLoader;
this.jobListenerFactory = jobListenerFactory;
generateId();
@@ -112,8 +106,20 @@ public class Note implements Serializable, JobListener {
this.replLoader = replLoader;
}
- public void setZeppelinConfiguration(ZeppelinConfiguration conf) {
- this.conf = conf;
+ public JobListenerFactory getJobListenerFactory() {
+ return jobListenerFactory;
+ }
+
+ public void setJobListenerFactory(JobListenerFactory jobListenerFactory) {
+ this.jobListenerFactory = jobListenerFactory;
+ }
+
+ public NotebookRepo getNotebookRepo() {
+ return repo;
+ }
+
+ public void setNotebookRepo(NotebookRepo repo) {
+ this.repo = repo;
}
public Map<String, List<AngularObject>> getAngularObjects() {
@@ -294,61 +300,11 @@ public class Note implements Serializable, JobListener {
}
public void persist() throws IOException {
- GsonBuilder gsonBuilder = new GsonBuilder();
- gsonBuilder.setPrettyPrinting();
- Gson gson = gsonBuilder.create();
-
- File dir = new File(conf.getNotebookDir() + "/" + id);
- if (!dir.exists()) {
- dir.mkdirs();
- } else if (dir.isFile()) {
- throw new RuntimeException("File already exists" + dir.toString());
- }
-
- File file = new File(conf.getNotebookDir() + "/" + id + "/note.json");
- logger().info("Persist note {} into {}", id, file.getAbsolutePath());
-
- snapshotAngularObjectRegistry();
- String json = gson.toJson(this);
- FileOutputStream out = new FileOutputStream(file);
- out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING)));
- out.close();
+ repo.save(this);
}
public void unpersist() throws IOException {
- File dir = new File(conf.getNotebookDir() + "/" + id);
-
- FileUtils.deleteDirectory(dir);
- }
-
- public static Note load(String id, ZeppelinConfiguration conf, NoteInterpreterLoader replLoader,
- Scheduler scheduler, JobListenerFactory jobListenerFactory, org.quartz.Scheduler quartzSched)
- throws IOException {
- GsonBuilder gsonBuilder = new GsonBuilder();
- gsonBuilder.setPrettyPrinting();
- Gson gson = gsonBuilder.create();
-
- File file = new File(conf.getNotebookDir() + "/" + id + "/note.json");
- logger().info("Load note {} from {}", id, file.getAbsolutePath());
-
- if (!file.isFile()) {
- return null;
- }
-
- FileInputStream ins = new FileInputStream(file);
- String json = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING));
- Note note = gson.fromJson(json, Note.class);
- note.setZeppelinConfiguration(conf);
- note.setReplLoader(replLoader);
- note.jobListenerFactory = jobListenerFactory;
- for (Paragraph p : note.paragraphs) {
- p.setNote(note);
-
- if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) {
- p.setStatus(Status.ABORT);
- }
- }
- return note;
+ repo.remove(id());
}
public Map<String, Object> getConfig() {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInfo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInfo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInfo.java
new file mode 100644
index 0000000..db07e50
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+public class NoteInfo {
+ String id;
+ String name;
+ private Map<String, Object> config = new HashMap<String, Object>();
+
+ public NoteInfo(String id, String name, Map<String, Object> config) {
+ super();
+ this.id = id;
+ this.name = name;
+ this.config = config;
+ }
+
+ public NoteInfo(Note note) {
+ id = note.id();
+ name = note.getName();
+ config = note.getConfig();
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Map<String, Object> getConfig() {
+ return config;
+ }
+
+ public void setConfig(Map<String, Object> config) {
+ this.config = config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 844763f..1b29509 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -17,7 +17,6 @@
package org.apache.zeppelin.notebook;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -32,11 +31,9 @@ import java.util.Map;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterFactory;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
-import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
@@ -64,11 +61,14 @@ public class Notebook {
private StdSchedulerFactory quertzSchedFact;
private org.quartz.Scheduler quartzSched;
private JobListenerFactory jobListenerFactory;
+ private NotebookRepo notebookRepo;
- public Notebook(ZeppelinConfiguration conf, SchedulerFactory schedulerFactory,
+ public Notebook(ZeppelinConfiguration conf, NotebookRepo notebookRepo,
+ SchedulerFactory schedulerFactory,
InterpreterFactory replFactory, JobListenerFactory jobListenerFactory) throws IOException,
SchedulerException {
this.conf = conf;
+ this.notebookRepo = notebookRepo;
this.schedulerFactory = schedulerFactory;
this.replFactory = replFactory;
this.jobListenerFactory = jobListenerFactory;
@@ -102,7 +102,7 @@ public class Notebook {
*/
public Note createNote(List<String> interpreterIds) throws IOException {
NoteInterpreterLoader intpLoader = new NoteInterpreterLoader(replFactory);
- Note note = new Note(conf, intpLoader, jobListenerFactory, quartzSched);
+ Note note = new Note(notebookRepo, intpLoader, jobListenerFactory);
intpLoader.setNoteId(note.id());
synchronized (notes) {
notes.put(note.id(), note);
@@ -111,6 +111,7 @@ public class Notebook {
bindInterpretersToNote(note.id(), interpreterIds);
}
+ note.persist();
return note;
}
@@ -159,79 +160,75 @@ public class Notebook {
}
}
- private void loadAllNotes() throws IOException {
- File notebookDir = new File(conf.getNotebookDir());
- File[] dirs = notebookDir.listFiles();
- if (dirs == null) {
- return;
+ private Note loadNoteFromRepo(String id) {
+ Note note = null;
+ try {
+ note = notebookRepo.get(id);
+ } catch (IOException e) {
+ logger.error("Failed to load " + id, e);
+ }
+ if (note == null) {
+ return null;
}
- Map<String, SnapshotAngularObject> angularObjectSnapshot =
- new HashMap<String, SnapshotAngularObject>();
+ // set NoteInterpreterLoader
+ NoteInterpreterLoader noteInterpreterLoader = new NoteInterpreterLoader(
+ replFactory);
+ note.setReplLoader(noteInterpreterLoader);
+ noteInterpreterLoader.setNoteId(note.id());
- for (File f : dirs) {
- boolean isHidden = f.getName().startsWith(".");
- if (f.isDirectory() && !isHidden) {
- Scheduler scheduler =
- schedulerFactory.createOrGetFIFOScheduler("note_" + System.currentTimeMillis());
- logger.info("Loading note from " + f.getName());
- NoteInterpreterLoader noteInterpreterLoader = new NoteInterpreterLoader(replFactory);
- Note note = Note.load(f.getName(),
- conf,
- noteInterpreterLoader,
- scheduler,
- jobListenerFactory, quartzSched);
- noteInterpreterLoader.setNoteId(note.id());
-
- // restore angular object --------------
- Date lastUpdatedDate = new Date(0);
- for (Paragraph p : note.getParagraphs()) {
- if (p.getDateFinished() != null &&
- lastUpdatedDate.before(p.getDateFinished())) {
- lastUpdatedDate = p.getDateFinished();
- }
- }
+ // set JobListenerFactory
+ note.setJobListenerFactory(jobListenerFactory);
- Map<String, List<AngularObject>> savedObjects = note.getAngularObjects();
-
- if (savedObjects != null) {
- for (String intpGroupName : savedObjects.keySet()) {
- List<AngularObject> objectList = savedObjects.get(intpGroupName);
-
- for (AngularObject savedObject : objectList) {
- SnapshotAngularObject snapshot = angularObjectSnapshot.get(savedObject.getName());
- if (snapshot == null || snapshot.getLastUpdate().before(lastUpdatedDate)) {
- angularObjectSnapshot.put(
- savedObject.getName(),
- new SnapshotAngularObject(
- intpGroupName,
- savedObject,
- lastUpdatedDate));
- }
- }
- }
- }
+ // set notebookRepo
+ note.setNotebookRepo(notebookRepo);
- synchronized (notes) {
- notes.put(note.id(), note);
- refreshCron(note.id());
- }
+ Map<String, SnapshotAngularObject> angularObjectSnapshot =
+ new HashMap<String, SnapshotAngularObject>();
+
+ // restore angular object --------------
+ Date lastUpdatedDate = new Date(0);
+ for (Paragraph p : note.getParagraphs()) {
+ p.setNote(note);
+ if (p.getDateFinished() != null &&
+ lastUpdatedDate.before(p.getDateFinished())) {
+ lastUpdatedDate = p.getDateFinished();
}
}
- for (String name : angularObjectSnapshot.keySet()) {
- SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
- List<InterpreterSetting> settings = replFactory.get();
- for (InterpreterSetting setting : settings) {
- InterpreterGroup intpGroup = setting.getInterpreterGroup();
- if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
- AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
- if (registry.get(name) == null) {
- registry.add(name, snapshot.getAngularObject().get(), false);
+ Map<String, List<AngularObject>> savedObjects = note.getAngularObjects();
+
+ if (savedObjects != null) {
+ for (String intpGroupName : savedObjects.keySet()) {
+ List<AngularObject> objectList = savedObjects.get(intpGroupName);
+
+ for (AngularObject savedObject : objectList) {
+ SnapshotAngularObject snapshot = angularObjectSnapshot.get(savedObject.getName());
+ if (snapshot == null || snapshot.getLastUpdate().before(lastUpdatedDate)) {
+ angularObjectSnapshot.put(
+ savedObject.getName(),
+ new SnapshotAngularObject(
+ intpGroupName,
+ savedObject,
+ lastUpdatedDate));
}
}
}
}
+
+ synchronized (notes) {
+ notes.put(note.id(), note);
+ refreshCron(note.id());
+ }
+ return note;
+ }
+
+ private void loadAllNotes() throws IOException {
+ List<NoteInfo> noteInfos = notebookRepo.list();
+
+ for (NoteInfo info : noteInfos) {
+ loadNoteFromRepo(info.getId());
+ }
}
class SnapshotAngularObject {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java
new file mode 100644
index 0000000..07e0875
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+
+/**
+ * Notebook repository (persistence layer) abstraction
+ */
+public interface NotebookRepo {
+ public List<NoteInfo> list() throws IOException;
+ public Note get(String noteId) throws IOException;
+ public void save(Note note) throws IOException;
+ public void remove(String noteId) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java
new file mode 100644
index 0000000..3039f80
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.repo;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.vfs2.FileContent;
+import org.apache.commons.vfs2.FileObject;
+import org.apache.commons.vfs2.FileSystemManager;
+import org.apache.commons.vfs2.FileType;
+import org.apache.commons.vfs2.NameScope;
+import org.apache.commons.vfs2.Selectors;
+import org.apache.commons.vfs2.VFS;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+*
+*/
+public class VFSNotebookRepo implements NotebookRepo {
+ Logger logger = LoggerFactory.getLogger(VFSNotebookRepo.class);
+
+ private FileSystemManager fsManager;
+ private URI filesystemRoot;
+
+ private ZeppelinConfiguration conf;
+
+ public VFSNotebookRepo(ZeppelinConfiguration conf) throws IOException {
+ this.conf = conf;
+
+ try {
+ filesystemRoot = new URI(conf.getNotebookDir());
+ } catch (URISyntaxException e1) {
+ throw new IOException(e1);
+ }
+
+ if (filesystemRoot.getScheme() == null) { // it is local path
+ try {
+ this.filesystemRoot = new URI(new File(
+ conf.getRelativeDir(filesystemRoot.getPath())).getAbsolutePath());
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ } else {
+ this.filesystemRoot = filesystemRoot;
+ }
+ fsManager = VFS.getManager();
+ }
+
+ private String getPath(String path) {
+ if (path == null || path.trim().length() == 0) {
+ return filesystemRoot.toString();
+ }
+ if (path.startsWith("/")) {
+ return filesystemRoot.toString() + path;
+ } else {
+ return filesystemRoot.toString() + "/" + path;
+ }
+ }
+
+ private boolean isDirectory(FileObject fo) throws IOException {
+ if (fo == null) return false;
+ if (fo.getType() == FileType.FOLDER) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public List<NoteInfo> list() throws IOException {
+ FileObject rootDir = getRootDir();
+
+ FileObject[] children = rootDir.getChildren();
+
+ List<NoteInfo> infos = new LinkedList<NoteInfo>();
+ for (FileObject f : children) {
+ String fileName = f.getName().getBaseName();
+ if (f.isHidden()
+ || fileName.startsWith(".")
+ || fileName.startsWith("#")
+ || fileName.startsWith("~")) {
+ // skip hidden, temporary files
+ continue;
+ }
+
+ if (!isDirectory(f)) {
+ // currently single note is saved like, [NOTE_ID]/note.json.
+ // so it must be a directory
+ continue;
+ }
+
+ NoteInfo info = null;
+
+ try {
+ info = getNoteInfo(f);
+ if (info != null) {
+ infos.add(info);
+ }
+ } catch (IOException e) {
+ logger.error("Can't read note " + f.getName().toString(), e);
+ }
+ }
+
+ return infos;
+ }
+
+ private Note getNote(FileObject noteDir) throws IOException {
+ if (!isDirectory(noteDir)) {
+ throw new IOException(noteDir.getName().toString() + " is not a directory");
+ }
+
+ FileObject noteJson = noteDir.resolveFile("note.json", NameScope.CHILD);
+ if (!noteJson.exists()) {
+ throw new IOException(noteJson.getName().toString() + " not found");
+ }
+
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ gsonBuilder.setPrettyPrinting();
+ Gson gson = gsonBuilder.create();
+
+ FileContent content = noteJson.getContent();
+ InputStream ins = content.getInputStream();
+ String json = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING));
+ ins.close();
+
+ Note note = gson.fromJson(json, Note.class);
+// note.setReplLoader(replLoader);
+// note.jobListenerFactory = jobListenerFactory;
+
+ for (Paragraph p : note.getParagraphs()) {
+ if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) {
+ p.setStatus(Status.ABORT);
+ }
+ }
+
+ return note;
+ }
+
+ private NoteInfo getNoteInfo(FileObject noteDir) throws IOException {
+ Note note = getNote(noteDir);
+ return new NoteInfo(note);
+ }
+
+ @Override
+ public Note get(String noteId) throws IOException {
+ FileObject rootDir = fsManager.resolveFile(getPath("/"));
+ FileObject noteDir = rootDir.resolveFile(noteId, NameScope.CHILD);
+
+ return getNote(noteDir);
+ }
+
+ private FileObject getRootDir() throws IOException {
+ FileObject rootDir = fsManager.resolveFile(getPath("/"));
+
+ if (!rootDir.exists()) {
+ throw new IOException("Root path does not exists");
+ }
+
+ if (!isDirectory(rootDir)) {
+ throw new IOException("Root path is not a directory");
+ }
+
+ return rootDir;
+ }
+
+ @Override
+ public void save(Note note) throws IOException {
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ gsonBuilder.setPrettyPrinting();
+ Gson gson = gsonBuilder.create();
+ String json = gson.toJson(note);
+
+ FileObject rootDir = getRootDir();
+
+ FileObject noteDir = rootDir.resolveFile(note.id(), NameScope.CHILD);
+
+ if (!noteDir.exists()) {
+ noteDir.createFolder();
+ }
+ if (!isDirectory(noteDir)) {
+ throw new IOException(noteDir.getName().toString() + " is not a directory");
+ }
+
+ FileObject noteJson = noteDir.resolveFile("note.json", NameScope.CHILD);
+ // false means not appending. creates file if not exists
+ OutputStream out = noteJson.getContent().getOutputStream(false);
+ out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING)));
+ out.close();
+ }
+
+ @Override
+ public void remove(String noteId) throws IOException {
+ FileObject rootDir = fsManager.resolveFile(getPath("/"));
+ FileObject noteDir = rootDir.resolveFile(noteId, NameScope.CHILD);
+
+ if (!noteDir.exists()) {
+ // nothing to do
+ return;
+ }
+
+ if (!isDirectory(noteDir)) {
+ // it is not look like zeppelin note savings
+ throw new IOException("Can not remove " + noteDir.getName().toString());
+ }
+
+ noteDir.delete(Selectors.SELECT_SELF_AND_CHILDREN);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 8d2c65a..0d4d111 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -32,6 +32,8 @@ import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
+import org.apache.zeppelin.notebook.repo.NotebookRepo;
+import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
@@ -48,6 +50,7 @@ public class NotebookTest implements JobListenerFactory{
private SchedulerFactory schedulerFactory;
private File notebookDir;
private Notebook notebook;
+ private NotebookRepo notebookRepo;
private InterpreterFactory factory;
@Before
@@ -71,7 +74,8 @@ public class NotebookTest implements JobListenerFactory{
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
- notebook = new Notebook(conf, schedulerFactory, factory, this);
+ notebookRepo = new VFSNotebookRepo(conf);
+ notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this);
}
@After
@@ -108,7 +112,7 @@ public class NotebookTest implements JobListenerFactory{
p1.setText("hello world");
note.persist();
- Notebook notebook2 = new Notebook(conf, schedulerFactory, new InterpreterFactory(conf, null), this);
+ Notebook notebook2 = new Notebook(conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this);
assertEquals(1, notebook2.getAllNotes().size());
}