You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2022/06/03 12:56:26 UTC

[zeppelin] branch master updated: [ZEPPELIN-5744] Allow NoteManager for concurrent operation (#4374)

This is an automated email from the ASF dual-hosted git repository.

jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 525d3617b0 [ZEPPELIN-5744] Allow NoteManager for concurrent operation (#4374)
525d3617b0 is described below

commit 525d3617b08573dc0272cc94735721a09b21716a
Author: Leomax_Sun <28...@qq.com>
AuthorDate: Fri Jun 3 20:56:17 2022 +0800

    [ZEPPELIN-5744] Allow NoteManager for concurrent operation (#4374)
---
 .../org/apache/zeppelin/notebook/NoteManager.java  |  19 ++--
 .../apache/zeppelin/notebook/NoteManagerTest.java  | 119 +++++++++++++++++++++
 2 files changed, 131 insertions(+), 7 deletions(-)

diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java
index ba6e443f8b..e2d393af54 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 import javax.inject.Inject;
@@ -79,7 +80,7 @@ public class NoteManager {
   // build the tree structure of notes
   private void init() throws IOException {
     this.notesInfo = notebookRepo.list(AuthenticationInfo.ANONYMOUS).values().stream()
-        .collect(Collectors.toMap(NoteInfo::getId, NoteInfo::getPath));
+        .collect(Collectors.toConcurrentMap(NoteInfo::getId, NoteInfo::getPath));
     this.notesInfo.entrySet().stream()
         .forEach(entry ->
         {
@@ -181,7 +182,11 @@ public class NoteManager {
     } else {
       addOrUpdateNoteNode(new NoteInfo(note));
       noteCache.putNote(note);
-      this.notebookRepo.save(note, subject);
+      // Make sure to execute `notebookRepo.save()` successfully in concurrent context
+      // Otherwise, the NullPointerException will be thrown when invoking notebookRepo.get() in the following operations.
+      synchronized (this) {
+        this.notebookRepo.save(note, subject);
+      }
     }
   }
 
@@ -218,7 +223,6 @@ public class NoteManager {
   public void moveNote(String noteId,
                        String newNotePath,
                        AuthenticationInfo subject) throws IOException {
-    String notePath = this.notesInfo.get(noteId);
     if (noteId == null) {
       throw new IOException("No metadata found for this note: " + noteId);
     }
@@ -228,6 +232,7 @@ public class NoteManager {
     }
 
     // move the old NoteNode from notePath to newNotePath
+    String notePath = this.notesInfo.get(noteId);
     NoteNode noteNode = getNoteNode(notePath);
     noteNode.getParent().removeNote(getNoteName(notePath));
     noteNode.setNotePath(newNotePath);
@@ -317,10 +322,10 @@ public class NoteManager {
    */
   public <T> T processNote(String noteId, boolean reload, NoteProcessor<T> noteProcessor)
       throws IOException {
-    String notePath = this.notesInfo.get(noteId);
-    if (notePath == null) {
+    if (this.notesInfo == null || noteId == null || !this.notesInfo.containsKey(noteId)) {
       return noteProcessor.process(null);
     }
+    String notePath = this.notesInfo.get(noteId);
     NoteNode noteNode = getNoteNode(notePath);
     return noteNode.loadAndProcessNote(reload, noteProcessor);
   }
@@ -433,9 +438,9 @@ public class NoteManager {
     private NoteCache noteCache;
 
     // noteName -> NoteNode
-    private Map<String, NoteNode> notes = new HashMap<>();
+    private Map<String, NoteNode> notes = new ConcurrentHashMap<>();
     // folderName -> Folder
-    private Map<String, Folder> subFolders = new HashMap<>();
+    private Map<String, Folder> subFolders = new ConcurrentHashMap<>();
 
     public Folder(String name, NotebookRepo notebookRepo, NoteCache noteCache) {
       this.name = name;
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java
index 822b6e87fe..f8d58dd272 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java
@@ -28,6 +28,11 @@ import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -158,4 +163,118 @@ public class NoteManagerTest {
     assertFalse(noteManager.containsNote(noteNew2.getPath()));
     assertEquals(cacheThreshold, noteManager.getCacheSize());
   }
+
+  @Test
+  public void testConcurrentOperation() throws Exception {
+    int threshold = 10, noteNum = 150;
+    Map<Integer, String> notes = new ConcurrentHashMap<>();
+    ExecutorService threadPool = Executors.newFixedThreadPool(threshold);
+    // Save note concurrently
+    ConcurrentTask saveNote = new ConcurrentTaskSaveNote(threadPool, noteNum, notes, "/prod/note%s");
+    saveNote.exec();
+    // Move note concurrently
+    ConcurrentTask moveNote = new ConcurrentTaskMoveNote(threadPool, noteNum, notes, "/dev/project_%s/my_note%s");
+    moveNote.exec();
+    // Move folder concurrently
+    ConcurrentTask moveFolder = new ConcurrentTaskMoveFolder(threadPool, noteNum, notes, "/staging/note_%s/my_note%s");
+    moveFolder.exec();
+    // Remove note concurrently
+    ConcurrentTask removeNote = new ConcurrentTaskRemoveNote(threadPool, noteNum, notes, null);
+    removeNote.exec();
+    threadPool.shutdown();
+  }
+
+  abstract class ConcurrentTask {
+    private ExecutorService threadPool;
+    private int noteNum;
+    private Map<Integer, String> notes;
+    private String pathPattern;
+
+    public ConcurrentTask(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
+      this.threadPool = threadPool;
+      this.noteNum = noteNum;
+      this.notes = notes;
+      this.pathPattern = pathPattern;
+    }
+
+    public abstract void run(int index) throws IOException;
+
+    public void exec() throws Exception {
+      // Simulate concurrent operation
+      CountDownLatch latch = new CountDownLatch(noteNum);
+      for (int i = 0; i < noteNum; i++) {
+        int index = i;
+        threadPool.execute(() -> {
+          try {
+            this.run(index);
+            latch.countDown();
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+        });
+      }
+      // wait till all tasks are completed with 5 seconds as timeout threshold
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      this.checkPathByPattern();
+    }
+
+    private void checkPathByPattern() throws IOException {
+      assertEquals(this.notes.size(), noteManager.getNotesInfo().size());
+      if (notes.isEmpty()) return;
+      for (Integer key : this.notes.keySet()) {
+        String expectPath = String.format(this.pathPattern, key, key);
+        assertEquals(expectPath, noteManager.processNote(notes.get(key), n -> n).getPath());
+      }
+    }
+  }
+
+  class ConcurrentTaskSaveNote extends ConcurrentTask {
+    public ConcurrentTaskSaveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
+      super(threadPool, noteNum, notes, pathPattern);
+    }
+
+    @Override
+    public void run(int index) throws IOException {
+      String tarPath = String.format(super.pathPattern, index, index);
+      Note note = createNote(tarPath);
+      noteManager.saveNote(note);
+      super.notes.put(index, note.getId());
+    }
+  }
+
+  class ConcurrentTaskMoveNote extends ConcurrentTask {
+    public ConcurrentTaskMoveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
+      super(threadPool, noteNum, notes, pathPattern);
+    }
+
+    @Override
+    public void run(int index) throws IOException {
+      String tarPath = String.format(super.pathPattern, index, index);
+      noteManager.moveNote(super.notes.get(index), tarPath, AuthenticationInfo.ANONYMOUS);
+    }
+  }
+
+  class ConcurrentTaskMoveFolder extends ConcurrentTask {
+    public ConcurrentTaskMoveFolder(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
+      super(threadPool, noteNum, notes, pathPattern);
+    }
+
+    @Override
+    public void run(int index) throws IOException {
+      String curPath = "/dev/project_" + index, tarPath = "/staging/note_" + index;
+      noteManager.moveFolder(curPath, tarPath, AuthenticationInfo.ANONYMOUS);
+    }
+  }
+
+  class ConcurrentTaskRemoveNote extends ConcurrentTask {
+    public ConcurrentTaskRemoveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
+      super(threadPool, noteNum, notes, pathPattern);
+    }
+
+    @Override
+    public void run(int index) throws IOException {
+      noteManager.removeNote(super.notes.get(index), AuthenticationInfo.ANONYMOUS);
+      super.notes.remove(index);
+    }
+  }
 }