You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by GitBox <gi...@apache.org> on 2022/05/30 08:27:14 UTC

[GitHub] [zeppelin] Reamer commented on a diff in pull request #4374: [ZEPPELIN-5744] Allow NoteManager for concurrent operation

Reamer commented on code in PR #4374:
URL: https://github.com/apache/zeppelin/pull/4374#discussion_r884557639


##########
zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java:
##########
@@ -158,4 +166,117 @@ public void testLruCache() throws IOException {
     assertFalse(noteManager.containsNote(noteNew2.getPath()));
     assertEquals(cacheThreshold, noteManager.getCacheSize());
   }
+
+  @Test
+  public void testConcurrentOperation() throws Exception {
+    int threshold = 10, noteNum = 150;
+    Map<Integer, String> notes = new ConcurrentHashMap<>();
+    ExecutorService threadPool = Executors.newFixedThreadPool(threshold);
+    // Save note concurrently
+    ConcurrentTask saveNote = new ConcurrentTaskSaveNote(threadPool, noteNum, notes, "/prod/note%s");
+    saveNote.exec();
+    // Move note concurrently
+    ConcurrentTask moveNote = new ConcurrentTaskMoveNote(threadPool, noteNum, notes, "/dev/project_%s/my_note%s");
+    moveNote.exec();
+    // Move folder concurrently
+    ConcurrentTask moveFolder = new ConcurrentTaskMoveFolder(threadPool, noteNum, notes, "/staging/note_%s/my_note%s");
+    moveFolder.exec();
+    // Remove note concurrently
+    ConcurrentTask removeNote = new ConcurrentTaskRemoveNote(threadPool, noteNum, notes, null);
+    removeNote.exec();
+    threadPool.shutdown();
+  }
+
+  abstract class ConcurrentTask {
+    private ExecutorService threadPool;
+    private int noteNum;
+    private Map<Integer, String> notes;
+    private String pathPattern;
+
+    public ConcurrentTask(ExecutorService threadPool, int noteNum, Map notes, String pathPattern) {
+      this.threadPool = threadPool;
+      this.noteNum = noteNum;
+      this.notes = notes;
+      this.pathPattern = pathPattern;
+    }
+
+    public abstract void run(int index) throws IOException;
+
+    public void exec() throws Exception {
+      // Simulate concurrent operation
+      CountDownLatch latch = new CountDownLatch(noteNum);
+      for (int i = 0; i < noteNum; i++) {
+        int index = i;
+        threadPool.execute(() -> {
+          try {
+            this.run(index);
+            latch.countDown();
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+        });
+      }
+      latch.await(); //wait till all tasks are completed
+      this.checkPathByPattern();
+    }
+
+    private void checkPathByPattern() throws IOException {
+      assertEquals(this.notes.size(), noteManager.getNotesInfo().size());
+      if (notes.isEmpty()) return;
+      for (Integer key : this.notes.keySet()) {
+        String expectPath = String.format(this.pathPattern, key, key);
+        assertEquals(expectPath, noteManager.processNote(notes.get(key), n -> n).getPath());
+      }
+    }
+  }
+
+  class ConcurrentTaskSaveNote extends ConcurrentTask {
+    public ConcurrentTaskSaveNote(ExecutorService threadPool, int noteNum, Map notes, String pathPattern) {
+      super(threadPool, noteNum, notes, pathPattern);
+    }
+
+    @Override
+    public void run(int index) throws IOException {
+      String tarPath = String.format(super.pathPattern, index, index);
+      Note note = createNote(tarPath);
+      noteManager.saveNote(note);
+      super.notes.put(index, note.getId());
+    }
+  }
+
+  class ConcurrentTaskMoveNote extends ConcurrentTask {
+    public ConcurrentTaskMoveNote(ExecutorService threadPool, int noteNum, Map notes, String pathPattern) {
+      super(threadPool, noteNum, notes, pathPattern);
+    }
+
+    @Override
+    public void run(int index) throws IOException {
+      String tarPath = String.format(super.pathPattern, index, index);
+      noteManager.moveNote(super.notes.get(index), tarPath, AuthenticationInfo.ANONYMOUS);
+    }
+  }
+
+  class ConcurrentTaskMoveFolder extends ConcurrentTask {
+    public ConcurrentTaskMoveFolder(ExecutorService threadPool, int noteNum, Map notes, String pathPattern) {
+      super(threadPool, noteNum, notes, pathPattern);
+    }
+
+    @Override
+    public void run(int index) throws IOException {
+      String curPath = "/dev/project_" + index, tarPath = "/staging/note_" + index;
+      noteManager.moveFolder(curPath, tarPath, AuthenticationInfo.ANONYMOUS);
+    }
+  }
+
+  class ConcurrentTaskRemoveNote extends ConcurrentTask {
+    public ConcurrentTaskRemoveNote(ExecutorService threadPool, int noteNum, Map notes, String pathPattern) {

Review Comment:
   ```suggestion
       public ConcurrentTaskRemoveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org