You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2016/08/16 09:28:20 UTC

zeppelin git commit: [ZEPPELIN-1294] Implement one-way sync for notebook repos

Repository: zeppelin
Updated Branches:
  refs/heads/master d794f6fdb -> 37696ea8b


[ZEPPELIN-1294] Implement one-way sync for notebook repos

### What is this PR for?
Currently, if there are two notebook storages configured, the sync process treats them equally and can pull changes from either storage. This can cause confusions if there is a clear distinction between primary and secondary storages, and the primary storage is managed outside of Zeppelin itself, such as a local folder that's git controlled. If a notebook is deleted or reverted to an older version in the primary storage behind Zeppelin's back, the sync process will overwrite the change with data from the secondary storage, likely against the user's intention.

The proposal is to enable the scenario with a one-way sync flag, where the primary storage is treated as the only source of truth and the secondary storage is merely a mirror of the primary.

### What type of PR is it?
Improvement

### Todos

### What is the Jira issue?
<https://issues.apache.org/jira/browse/ZEPPELIN-1294>

### How should this be tested?
1. Setup two notebook storages through `ZEPPELIN_NOTEBOOK_STORAGE`, with the first is local file storage, i.e. `org.apache.zeppelin.notebook.repo.VFSNotebookRepo`. The second storage can be anything, e.g. `org.apache.zeppelin.notebook.repo.zeppelinhub.ZeppelinHubRepo`.
2. Add `export ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC=true` to `zeppelin-env.sh`.
3. Start Zeppelin server.
4. Delete a notebook directly from the notebook folder.
5. Refresh notebooks from Zeppelin UI.
6. Observe from the secondary storage that the notebook has been deleted.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Documentation for the new flag has been added to `install.md`

Author: Hao Xia <ha...@optimizely.com>

Closes #1286 from jasonxh/hao/one-way-sync and squashes the following commits:

75a0670 [Hao Xia] Address PR comments
566b0ed [Hao Xia] Implement one-way sync for notebook repos


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/37696ea8
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/37696ea8
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/37696ea8

Branch: refs/heads/master
Commit: 37696ea8bd6a5c3b910f7d24141530f91f80391c
Parents: d794f6f
Author: Hao Xia <ha...@optimizely.com>
Authored: Fri Aug 5 09:57:36 2016 -0700
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Tue Aug 16 18:28:17 2016 +0900

----------------------------------------------------------------------
 conf/zeppelin-env.cmd.template                  |  1 +
 conf/zeppelin-env.sh.template                   |  1 +
 conf/zeppelin-site.xml.template                 |  6 +++
 docs/install/install.md                         |  6 +++
 .../zeppelin/conf/ZeppelinConfiguration.java    |  1 +
 .../notebook/repo/NotebookRepoSync.java         | 55 ++++++++++++++++----
 .../notebook/repo/NotebookRepoSyncTest.java     | 49 +++++++++++++++++
 7 files changed, 109 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/37696ea8/conf/zeppelin-env.cmd.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-env.cmd.template b/conf/zeppelin-env.cmd.template
index de89674..fd42b32 100644
--- a/conf/zeppelin-env.cmd.template
+++ b/conf/zeppelin-env.cmd.template
@@ -35,6 +35,7 @@ REM set ZEPPELIN_IDENT_STRING   		REM A string representing this instance of zep
 REM set ZEPPELIN_NICENESS       		REM The scheduling priority for daemons. Defaults to 0.
 REM set ZEPPELIN_INTERPRETER_LOCALREPO         REM Local repository for interpreter's additional dependency loading
 REM set ZEPPELIN_NOTEBOOK_STORAGE		REM Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
+REM set ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC		REM If there are multiple notebook storages, should we treat the first one as the only source of truth?
 
 
 REM Spark interpreter configuration

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/37696ea8/conf/zeppelin-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template
index 14fdd54..a247ed5 100644
--- a/conf/zeppelin-env.sh.template
+++ b/conf/zeppelin-env.sh.template
@@ -36,6 +36,7 @@
 # export ZEPPELIN_NICENESS       		# The scheduling priority for daemons. Defaults to 0.
 # export ZEPPELIN_INTERPRETER_LOCALREPO         # Local repository for interpreter's additional dependency loading
 # export ZEPPELIN_NOTEBOOK_STORAGE 		# Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote).
+# export ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC	# If there are multiple notebook storages, should we treat the first one as the only source of truth?
 
 #### Spark interpreter configuration ####
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/37696ea8/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 3bdc273..879f5aa 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -165,6 +165,12 @@
 </property>
 
 <property>
+  <name>zeppelin.notebook.one.way.sync</name>
+  <value>false</value>
+  <description>If there are multiple notebook storages, should we treat the first one as the only source of truth?</description>
+</property>
+
+<property>
   <name>zeppelin.interpreter.dir</name>
   <value>interpreter</value>
   <description>Interpreter implementation base directory</description>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/37696ea8/docs/install/install.md
----------------------------------------------------------------------
diff --git a/docs/install/install.md b/docs/install/install.md
index 9262716..b0f9fe4 100644
--- a/docs/install/install.md
+++ b/docs/install/install.md
@@ -375,6 +375,12 @@ You can configure Apache Zeppelin with both **environment variables** in `conf/z
     <td>Comma separated list of notebook storage</td>
   </tr>
   <tr>
+    <td>ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC</td>
+    <td>zeppelin.notebook.one.way.sync</td>
+    <td>false</td>
+    <td>If there are multiple notebook storages, should we treat the first one as the only source of truth?</td>
+  </tr>
+  <tr>
     <td>ZEPPELIN_INTERPRETERS</td>
     <td>zeppelin.interpreters</td>
   <description></description>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/37696ea8/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 43fb6be..d146a25 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -543,6 +543,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
     ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"),
     ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()),
+    ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false),
     ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner",
         System.getProperty("os.name")
                 .startsWith("Windows") ? "bin/interpreter.cmd" : "bin/interpreter.sh"),

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/37696ea8/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java
index e8c2c08..7208726 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java
@@ -44,11 +44,13 @@ public class NotebookRepoSync implements NotebookRepo {
   private static final int maxRepoNum = 2;
   private static final String pushKey = "pushNoteIDs";
   private static final String pullKey = "pullNoteIDs";
+  private static final String delDstKey = "delDstNoteIDs";
 
   private static ZeppelinConfiguration config;
   private static final String defaultStorage = "org.apache.zeppelin.notebook.repo.VFSNotebookRepo";
 
   private List<NotebookRepo> repos = new ArrayList<NotebookRepo>();
+  private final boolean oneWaySync;
 
   /**
    * @param noteIndex
@@ -58,6 +60,7 @@ public class NotebookRepoSync implements NotebookRepo {
   @SuppressWarnings("static-access")
   public NotebookRepoSync(ZeppelinConfiguration conf) {
     config = conf;
+    oneWaySync = conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC);
     String allStorageClassNames = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE).trim();
     if (allStorageClassNames.isEmpty()) {
       allStorageClassNames = defaultStorage;
@@ -182,6 +185,8 @@ public class NotebookRepoSync implements NotebookRepo {
     Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo);
     List<String> pushNoteIDs = noteIDs.get(pushKey);
     List<String> pullNoteIDs = noteIDs.get(pullKey);
+    List<String> delDstNoteIDs = noteIDs.get(delDstKey);
+
     if (!pushNoteIDs.isEmpty()) {
       LOG.info("Notes with the following IDs will be pushed");
       for (String id : pushNoteIDs) {
@@ -202,6 +207,16 @@ public class NotebookRepoSync implements NotebookRepo {
       LOG.info("Nothing to pull");
     }
 
+    if (!delDstNoteIDs.isEmpty()) {
+      LOG.info("Notes with the following IDs will be deleted from dest");
+      for (String id : delDstNoteIDs) {
+        LOG.info("ID : " + id);
+      }
+      deleteNotes(delDstNoteIDs, dstRepo);
+    } else {
+      LOG.info("Nothing to delete from dest");
+    }
+
     LOG.info("Sync ended");
   }
 
@@ -216,6 +231,12 @@ public class NotebookRepoSync implements NotebookRepo {
     }
   }
 
+  private void deleteNotes(List<String> ids, NotebookRepo repo) throws IOException {
+    for (String id : ids) {
+      repo.remove(id, null);
+    }
+  }
+
   public int getRepoCount() {
     return repos.size();
   }
@@ -237,6 +258,7 @@ public class NotebookRepoSync implements NotebookRepo {
       throws IOException {
     List <String> pushIDs = new ArrayList<String>();
     List <String> pullIDs = new ArrayList<String>();
+    List <String> delDstIDs = new ArrayList<String>();
 
     NoteInfo dnote;
     Date sdate, ddate;
@@ -246,14 +268,18 @@ public class NotebookRepoSync implements NotebookRepo {
         /* note exists in source and destination storage systems */
         sdate = lastModificationDate(sourceRepo.get(snote.getId(), null));
         ddate = lastModificationDate(destRepo.get(dnote.getId(), null));
-        if (sdate.after(ddate)) {
-          /* source contains more up to date note - push */
-          pushIDs.add(snote.getId());
-          LOG.info("Modified note is added to push list : " + sdate);
-        } else if (sdate.compareTo(ddate) != 0) {
-          /* destination contains more up to date note - pull */
-          LOG.info("Modified note is added to pull list : " + ddate);
-          pullIDs.add(snote.getId());
+
+        if (sdate.compareTo(ddate) != 0) {
+          if (sdate.after(ddate) || oneWaySync) {
+            /* if source contains more up to date note - push
+             * if oneWaySync is enabled, always push no matter who's newer */
+            pushIDs.add(snote.getId());
+            LOG.info("Modified note is added to push list : " + sdate);
+          } else {
+            /* destination contains more up to date note - pull */
+            LOG.info("Modified note is added to pull list : " + ddate);
+            pullIDs.add(snote.getId());
+          }
         }
       } else {
         /* note exists in source storage, and absent in destination
@@ -266,14 +292,23 @@ public class NotebookRepoSync implements NotebookRepo {
     for (NoteInfo note : destNotes) {
       dnote = containsID(sourceNotes, note.getId());
       if (dnote == null) {
-        /* note exists in destination storage, and absent in source - pull*/
-        pullIDs.add(note.getId());
+        /* note exists in destination storage, and absent in source */
+        if (oneWaySync) {
+          /* if oneWaySync is enabled, delete the note from destination */
+          LOG.info("Extraneous note is added to delete dest list : " + note.getId());
+          delDstIDs.add(note.getId());
+        } else {
+          /* if oneWaySync is disabled, pull the note from destination */
+          LOG.info("Missing note is added to pull list : " + note.getId());
+          pullIDs.add(note.getId());
+        }
       }
     }
 
     Map<String, List<String>> map = new HashMap<String, List<String>>();
     map.put(pushKey, pushIDs);
     map.put(pullKey, pullIDs);
+    map.put(delDstKey, delDstIDs);
     return map;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/37696ea8/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
index 0c67d79..bd13120 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
@@ -84,6 +84,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
     System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath());
     System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
     System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.VFSNotebookRepo,org.apache.zeppelin.notebook.repo.mock.VFSNotebookRepoMock");
+    System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC.getVarName(), "false");
     LOG.info("main Note dir : " + mainNotePath);
     LOG.info("secondary note dir : " + secNotePath);
     conf = ZeppelinConfiguration.create();
@@ -221,6 +222,54 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
   }
 
   @Test
+  public void testOneWaySyncOnReloadedList() throws IOException, SchedulerException {
+    System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath());
+    System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC.getVarName(), "true");
+    conf = ZeppelinConfiguration.create();
+    notebookRepoSync = new NotebookRepoSync(conf);
+    notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, search,
+            notebookAuthorization, credentials);
+
+    // check that both storage repos are empty
+    assertTrue(notebookRepoSync.getRepoCount() > 1);
+    assertEquals(0, notebookRepoSync.list(0, null).size());
+    assertEquals(0, notebookRepoSync.list(1, null).size());
+
+    File srcDir = new File("src/test/resources/2A94M5J1Z");
+    File destDir = new File(secNotebookDir + "/2A94M5J1Z");
+
+    // copy manually new notebook into secondary storage repo and check repos
+    try {
+      FileUtils.copyDirectory(srcDir, destDir);
+    } catch (IOException e) {
+      LOG.error(e.toString(), e);
+    }
+    assertEquals(0, notebookRepoSync.list(0, null).size());
+    assertEquals(1, notebookRepoSync.list(1, null).size());
+
+    // after reloading the notebook should be wiped from secondary storage
+    notebookSync.reloadAllNotes(null);
+    assertEquals(0, notebookRepoSync.list(0, null).size());
+    assertEquals(0, notebookRepoSync.list(1, null).size());
+
+    destDir = new File(mainNotebookDir + "/2A94M5J1Z");
+
+    // copy manually new notebook into primary storage repo and check repos
+    try {
+      FileUtils.copyDirectory(srcDir, destDir);
+    } catch (IOException e) {
+      LOG.error(e.toString(), e);
+    }
+    assertEquals(1, notebookRepoSync.list(0, null).size());
+    assertEquals(0, notebookRepoSync.list(1, null).size());
+
+    // after reloading notebooks repos should be synchronized
+    notebookSync.reloadAllNotes(null);
+    assertEquals(1, notebookRepoSync.list(0, null).size());
+    assertEquals(1, notebookRepoSync.list(1, null).size());
+  }
+
+  @Test
   public void testCheckpointOneStorage() throws IOException, SchedulerException {
     System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.GitNotebookRepo");
     ZeppelinConfiguration vConf = ZeppelinConfiguration.create();