You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2022/06/03 12:56:26 UTC
[zeppelin] branch master updated: [ZEPPELIN-5744] Allow NoteManager for concurrent operation (#4374)
This is an automated email from the ASF dual-hosted git repository.
jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 525d3617b0 [ZEPPELIN-5744] Allow NoteManager for concurrent operation (#4374)
525d3617b0 is described below
commit 525d3617b08573dc0272cc94735721a09b21716a
Author: Leomax_Sun <28...@qq.com>
AuthorDate: Fri Jun 3 20:56:17 2022 +0800
[ZEPPELIN-5744] Allow NoteManager for concurrent operation (#4374)
---
.../org/apache/zeppelin/notebook/NoteManager.java | 19 ++--
.../apache/zeppelin/notebook/NoteManagerTest.java | 119 +++++++++++++++++++++
2 files changed, 131 insertions(+), 7 deletions(-)
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java
index ba6e443f8b..e2d393af54 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import javax.inject.Inject;
@@ -79,7 +80,7 @@ public class NoteManager {
// build the tree structure of notes
private void init() throws IOException {
this.notesInfo = notebookRepo.list(AuthenticationInfo.ANONYMOUS).values().stream()
- .collect(Collectors.toMap(NoteInfo::getId, NoteInfo::getPath));
+ .collect(Collectors.toConcurrentMap(NoteInfo::getId, NoteInfo::getPath));
this.notesInfo.entrySet().stream()
.forEach(entry ->
{
@@ -181,7 +182,11 @@ public class NoteManager {
} else {
addOrUpdateNoteNode(new NoteInfo(note));
noteCache.putNote(note);
- this.notebookRepo.save(note, subject);
+ // Make sure to execute `notebookRepo.save()` successfully in concurrent context
+ // Otherwise, the NullPointerException will be thrown when invoking notebookRepo.get() in the following operations.
+ synchronized (this) {
+ this.notebookRepo.save(note, subject);
+ }
}
}
@@ -218,7 +223,6 @@ public class NoteManager {
public void moveNote(String noteId,
String newNotePath,
AuthenticationInfo subject) throws IOException {
- String notePath = this.notesInfo.get(noteId);
if (noteId == null) {
throw new IOException("No metadata found for this note: " + noteId);
}
@@ -228,6 +232,7 @@ public class NoteManager {
}
// move the old NoteNode from notePath to newNotePath
+ String notePath = this.notesInfo.get(noteId);
NoteNode noteNode = getNoteNode(notePath);
noteNode.getParent().removeNote(getNoteName(notePath));
noteNode.setNotePath(newNotePath);
@@ -317,10 +322,10 @@ public class NoteManager {
*/
public <T> T processNote(String noteId, boolean reload, NoteProcessor<T> noteProcessor)
throws IOException {
- String notePath = this.notesInfo.get(noteId);
- if (notePath == null) {
+ if (this.notesInfo == null || noteId == null || !this.notesInfo.containsKey(noteId)) {
return noteProcessor.process(null);
}
+ String notePath = this.notesInfo.get(noteId);
NoteNode noteNode = getNoteNode(notePath);
return noteNode.loadAndProcessNote(reload, noteProcessor);
}
@@ -433,9 +438,9 @@ public class NoteManager {
private NoteCache noteCache;
// noteName -> NoteNode
- private Map<String, NoteNode> notes = new HashMap<>();
+ private Map<String, NoteNode> notes = new ConcurrentHashMap<>();
// folderName -> Folder
- private Map<String, Folder> subFolders = new HashMap<>();
+ private Map<String, Folder> subFolders = new ConcurrentHashMap<>();
public Folder(String name, NotebookRepo notebookRepo, NoteCache noteCache) {
this.name = name;
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java
index 822b6e87fe..f8d58dd272 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java
@@ -28,6 +28,11 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -158,4 +163,118 @@ public class NoteManagerTest {
assertFalse(noteManager.containsNote(noteNew2.getPath()));
assertEquals(cacheThreshold, noteManager.getCacheSize());
}
+
+ @Test
+ public void testConcurrentOperation() throws Exception {
+ int threshold = 10, noteNum = 150;
+ Map<Integer, String> notes = new ConcurrentHashMap<>();
+ ExecutorService threadPool = Executors.newFixedThreadPool(threshold);
+ // Save note concurrently
+ ConcurrentTask saveNote = new ConcurrentTaskSaveNote(threadPool, noteNum, notes, "/prod/note%s");
+ saveNote.exec();
+ // Move note concurrently
+ ConcurrentTask moveNote = new ConcurrentTaskMoveNote(threadPool, noteNum, notes, "/dev/project_%s/my_note%s");
+ moveNote.exec();
+ // Move folder concurrently
+ ConcurrentTask moveFolder = new ConcurrentTaskMoveFolder(threadPool, noteNum, notes, "/staging/note_%s/my_note%s");
+ moveFolder.exec();
+ // Remove note concurrently
+ ConcurrentTask removeNote = new ConcurrentTaskRemoveNote(threadPool, noteNum, notes, null);
+ removeNote.exec();
+ threadPool.shutdown();
+ }
+
+ abstract class ConcurrentTask {
+ private ExecutorService threadPool;
+ private int noteNum;
+ private Map<Integer, String> notes;
+ private String pathPattern;
+
+ public ConcurrentTask(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
+ this.threadPool = threadPool;
+ this.noteNum = noteNum;
+ this.notes = notes;
+ this.pathPattern = pathPattern;
+ }
+
+ public abstract void run(int index) throws IOException;
+
+ public void exec() throws Exception {
+ // Simulate concurrent operation
+ CountDownLatch latch = new CountDownLatch(noteNum);
+ for (int i = 0; i < noteNum; i++) {
+ int index = i;
+ threadPool.execute(() -> {
+ try {
+ this.run(index);
+ latch.countDown();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+ // wait till all tasks are completed with 5 seconds as timeout threshold
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ this.checkPathByPattern();
+ }
+
+ private void checkPathByPattern() throws IOException {
+ assertEquals(this.notes.size(), noteManager.getNotesInfo().size());
+ if (notes.isEmpty()) return;
+ for (Integer key : this.notes.keySet()) {
+ String expectPath = String.format(this.pathPattern, key, key);
+ assertEquals(expectPath, noteManager.processNote(notes.get(key), n -> n).getPath());
+ }
+ }
+ }
+
+ class ConcurrentTaskSaveNote extends ConcurrentTask {
+ public ConcurrentTaskSaveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
+ super(threadPool, noteNum, notes, pathPattern);
+ }
+
+ @Override
+ public void run(int index) throws IOException {
+ String tarPath = String.format(super.pathPattern, index, index);
+ Note note = createNote(tarPath);
+ noteManager.saveNote(note);
+ super.notes.put(index, note.getId());
+ }
+ }
+
+ class ConcurrentTaskMoveNote extends ConcurrentTask {
+ public ConcurrentTaskMoveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
+ super(threadPool, noteNum, notes, pathPattern);
+ }
+
+ @Override
+ public void run(int index) throws IOException {
+ String tarPath = String.format(super.pathPattern, index, index);
+ noteManager.moveNote(super.notes.get(index), tarPath, AuthenticationInfo.ANONYMOUS);
+ }
+ }
+
+ class ConcurrentTaskMoveFolder extends ConcurrentTask {
+ public ConcurrentTaskMoveFolder(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
+ super(threadPool, noteNum, notes, pathPattern);
+ }
+
+ @Override
+ public void run(int index) throws IOException {
+ String curPath = "/dev/project_" + index, tarPath = "/staging/note_" + index;
+ noteManager.moveFolder(curPath, tarPath, AuthenticationInfo.ANONYMOUS);
+ }
+ }
+
+ class ConcurrentTaskRemoveNote extends ConcurrentTask {
+ public ConcurrentTaskRemoveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) {
+ super(threadPool, noteNum, notes, pathPattern);
+ }
+
+ @Override
+ public void run(int index) throws IOException {
+ noteManager.removeNote(super.notes.get(index), AuthenticationInfo.ANONYMOUS);
+ super.notes.remove(index);
+ }
+ }
}