You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2021/05/12 12:49:55 UTC
[zeppelin] branch master updated: [ZEPPELIN-5357] Remove lucene
writer lock during shutdown
This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new f65ba50 [ZEPPELIN-5357] Remove lucene writer lock during shutdown
f65ba50 is described below
commit f65ba502031c6934b941771da322c572b75581a4
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Thu May 6 13:13:29 2021 +0200
[ZEPPELIN-5357] Remove lucene writer lock during shutdown
### What is this PR for?
Remove the lucene writer lock during shutdown of the jetty service.
### What type of PR is it?
- Bug Fix
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5357
### How should this be tested?
* CI
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Philipp Dallig <ph...@gmail.com>
Closes #4109 from Reamer/lucene_close and squashes the following commits:
b1dc7f932 [Philipp Dallig] Rewrite NoteEventAsyncListener to use a ThreadPoolExecutor instead of a simple Thread
7bde85927 [Philipp Dallig] close SearchService
---
.../zeppelin/service/NotebookServiceTest.java | 4 +-
.../zeppelin/notebook/NoteEventAsyncListener.java | 121 ++++++++++-----------
.../org/apache/zeppelin/search/LuceneSearch.java | 13 ++-
.../apache/zeppelin/search/NoSearchService.java | 2 +-
.../org/apache/zeppelin/search/SearchService.java | 3 +
.../apache/zeppelin/search/LuceneSearchTest.java | 55 +++++-----
6 files changed, 103 insertions(+), 95 deletions(-)
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
index 7168bbd..4700211 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
@@ -82,6 +82,7 @@ public class NotebookServiceTest {
private static NotebookService notebookService;
private File notebookDir;
+ private SearchService searchService;
private ServiceContext context =
new ServiceContext(AuthenticationInfo.ANONYMOUS, new HashSet<>());
@@ -116,7 +117,7 @@ public class NotebookServiceTest {
when(mockInterpreterSetting.isUserAuthorized(any())).thenReturn(true);
when(mockInterpreterGroup.getInterpreterSetting()).thenReturn(mockInterpreterSetting);
when(mockInterpreterSetting.getStatus()).thenReturn(InterpreterSetting.Status.READY);
- SearchService searchService = new LuceneSearch(zeppelinConfiguration);
+ searchService = new LuceneSearch(zeppelinConfiguration);
Credentials credentials = new Credentials();
NoteManager noteManager = new NoteManager(notebookRepo);
AuthorizationService authorizationService = new AuthorizationService(noteManager, zeppelinConfiguration);
@@ -147,6 +148,7 @@ public class NotebookServiceTest {
@After
public void tearDown() {
notebookDir.delete();
+ searchService.close();
}
@Test
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
index 97f799c..d451d9a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
@@ -18,28 +18,32 @@
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.SchedulerThreadFactory;
import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.util.ExecutorUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.BlockingQueue;
+import java.io.Closeable;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
/**
* An special NoteEventListener which handle events asynchronously
*/
-public abstract class NoteEventAsyncListener implements NoteEventListener {
+public abstract class NoteEventAsyncListener implements NoteEventListener, Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(NoteEventAsyncListener.class);
- private BlockingQueue<NoteEvent> eventsQueue = new LinkedBlockingQueue<>();
-
- private Thread eventHandlerThread;
+ private final ThreadPoolExecutor executor;
+ private final String name;
- public NoteEventAsyncListener(String name) {
- this.eventHandlerThread = new EventHandlingThread();
- this.eventHandlerThread.setName(name);
- this.eventHandlerThread.start();
+ protected NoteEventAsyncListener(String name) {
+ this.name = name;
+ executor = new ThreadPoolExecutor(0, 1, 1, TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(), new SchedulerThreadFactory(name));
}
public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) throws Exception;
@@ -55,84 +59,79 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
public abstract void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) throws Exception;
+ @Override
public void close() {
- this.eventHandlerThread.interrupt();
+ ExecutorUtil.softShutdown(name, executor, 2, TimeUnit.SECONDS);
}
@Override
public void onNoteCreate(Note note, AuthenticationInfo subject) {
- eventsQueue.add(new NoteCreateEvent(note, subject));
+ executor.execute(new EventHandling(new NoteCreateEvent(note)));
}
@Override
public void onNoteRemove(Note note, AuthenticationInfo subject) {
- eventsQueue.add(new NoteRemoveEvent(note, subject));
+ executor.execute(new EventHandling(new NoteRemoveEvent(note)));
}
@Override
public void onNoteUpdate(Note note, AuthenticationInfo subject) {
- eventsQueue.add(new NoteUpdateEvent(note, subject));
+ executor.execute(new EventHandling(new NoteUpdateEvent(note)));
}
@Override
public void onParagraphCreate(Paragraph p) {
- eventsQueue.add(new ParagraphCreateEvent(p));
+ executor.execute(new EventHandling(new ParagraphCreateEvent(p)));
}
@Override
public void onParagraphRemove(Paragraph p) {
- eventsQueue.add(new ParagraphRemoveEvent(p));
+ executor.execute(new EventHandling(new ParagraphRemoveEvent(p)));
}
@Override
public void onParagraphUpdate(Paragraph p) {
- eventsQueue.add(new ParagraphUpdateEvent(p));
+ executor.execute(new EventHandling(new ParagraphUpdateEvent(p)));
}
@Override
public void onParagraphStatusChange(Paragraph p, Job.Status status) {
- eventsQueue.add(new ParagraphStatusChangeEvent(p));
+ executor.execute(new EventHandling(new ParagraphStatusChangeEvent(p)));
}
- class EventHandlingThread extends Thread {
+ class EventHandling implements Runnable {
+
+ private final NoteEvent event;
+ public EventHandling(NoteEvent event) {
+ this.event = event;
+ }
@Override
public void run() {
- while(!Thread.interrupted()) {
- try {
- NoteEvent event = eventsQueue.take();
- if (event instanceof NoteCreateEvent) {
- handleNoteCreateEvent((NoteCreateEvent) event);
- } else if (event instanceof NoteRemoveEvent) {
- handleNoteRemoveEvent((NoteRemoveEvent) event);
- } else if (event instanceof NoteUpdateEvent) {
- handleNoteUpdateEvent((NoteUpdateEvent) event);
- } else if (event instanceof ParagraphCreateEvent) {
- handleParagraphCreateEvent((ParagraphCreateEvent) event);
- } else if (event instanceof ParagraphRemoveEvent) {
- handleParagraphRemoveEvent((ParagraphRemoveEvent) event);
- } else if (event instanceof ParagraphUpdateEvent) {
- handleParagraphUpdateEvent((ParagraphUpdateEvent) event);
- } else {
- throw new RuntimeException("Unknown event: " + event.getClass().getSimpleName());
- }
- } catch (Exception e) {
- LOGGER.error("Fail to handle NoteEvent", e);
+ try {
+ if (event instanceof NoteCreateEvent) {
+ handleNoteCreateEvent((NoteCreateEvent) event);
+ } else if (event instanceof NoteRemoveEvent) {
+ handleNoteRemoveEvent((NoteRemoveEvent) event);
+ } else if (event instanceof NoteUpdateEvent) {
+ handleNoteUpdateEvent((NoteUpdateEvent) event);
+ } else if (event instanceof ParagraphCreateEvent) {
+ handleParagraphCreateEvent((ParagraphCreateEvent) event);
+ } else if (event instanceof ParagraphRemoveEvent) {
+ handleParagraphRemoveEvent((ParagraphRemoveEvent) event);
+ } else if (event instanceof ParagraphUpdateEvent) {
+ handleParagraphUpdateEvent((ParagraphUpdateEvent) event);
+ } else {
+ throw new RuntimeException("Unknown event: " + event.getClass().getSimpleName());
}
+ } catch (Exception e) {
+ LOGGER.error("Fail to handle NoteEvent", e);
}
}
}
- /**
- * Used for testing
- *
- * @throws InterruptedException
- */
- public void drainEvents() throws InterruptedException {
- while(!eventsQueue.isEmpty()) {
- Thread.sleep(1000);
- }
- Thread.sleep(5000);
+ public boolean isEventQueueEmpty() {
+ return executor.getQueue().isEmpty();
}
interface NoteEvent {
@@ -140,12 +139,10 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
}
public static class NoteCreateEvent implements NoteEvent {
- private Note note;
- private AuthenticationInfo subject;
+ private final Note note;
- public NoteCreateEvent(Note note, AuthenticationInfo subject) {
+ public NoteCreateEvent(Note note) {
this.note = note;
- this.subject = subject;
}
public Note getNote() {
@@ -154,12 +151,10 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
}
public static class NoteUpdateEvent implements NoteEvent {
- private Note note;
- private AuthenticationInfo subject;
+ private final Note note;
- public NoteUpdateEvent(Note note, AuthenticationInfo subject) {
+ public NoteUpdateEvent(Note note) {
this.note = note;
- this.subject = subject;
}
public Note getNote() {
@@ -169,12 +164,10 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
public static class NoteRemoveEvent implements NoteEvent {
- private Note note;
- private AuthenticationInfo subject;
+ private final Note note;
- public NoteRemoveEvent(Note note, AuthenticationInfo subject) {
+ public NoteRemoveEvent(Note note) {
this.note = note;
- this.subject = subject;
}
public Note getNote() {
@@ -183,7 +176,7 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
}
public static class ParagraphCreateEvent implements NoteEvent {
- private Paragraph p;
+ private final Paragraph p;
public ParagraphCreateEvent(Paragraph p) {
this.p = p;
@@ -195,7 +188,7 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
}
public static class ParagraphUpdateEvent implements NoteEvent {
- private Paragraph p;
+ private final Paragraph p;
public ParagraphUpdateEvent(Paragraph p) {
this.p = p;
@@ -207,7 +200,7 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
}
public static class ParagraphRemoveEvent implements NoteEvent {
- private Paragraph p;
+ private final Paragraph p;
public ParagraphRemoveEvent(Paragraph p) {
this.p = p;
@@ -219,7 +212,7 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
}
public static class ParagraphStatusChangeEvent implements NoteEvent {
- private Paragraph p;
+ private final Paragraph p;
public ParagraphStatusChangeEvent(Paragraph p) {
this.p = p;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
index 98456ce..3c3645c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
@@ -27,6 +27,7 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.lucene.analysis.Analyzer;
@@ -84,7 +85,7 @@ public class LuceneSearch extends SearchService {
@Inject
public LuceneSearch(ZeppelinConfiguration conf) {
- super("LuceneSearch-Thread");
+ super("LuceneSearch");
if (conf.isZeppelinSearchUseDisk()) {
try {
@@ -309,8 +310,8 @@ public class LuceneSearch extends SearchService {
}
@Override
- public void addParagraphIndex(Paragraph pararaph) throws IOException {
- updateDoc(pararaph.getNote().getId(), pararaph.getNote().getName(), pararaph);
+ public void addParagraphIndex(Paragraph paragraph) throws IOException {
+ updateDoc(paragraph.getNote().getId(), paragraph.getNote().getName(), paragraph);
}
/**
@@ -364,15 +365,19 @@ public class LuceneSearch extends SearchService {
} catch (IOException e) {
LOGGER.error("Failed to delete {} from index by '{}'", noteId, fullNoteOrJustParagraph, e);
}
- LOGGER.debug("Done, index contains {} docs now {}", indexWriter.numDocs());
+ LOGGER.debug("Done, index contains {} docs now", indexWriter.numDocs());
}
/* (non-Javadoc)
* @see org.apache.zeppelin.search.Search#close()
*/
@Override
+ @PreDestroy
public void close() {
+ // First interrupt the LuceneSearch-Thread
+ super.close();
try {
+ // Second close the indexWriter
indexWriter.close();
} catch (IOException e) {
LOGGER.error("Failed to .close() the notebook index", e);
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/NoSearchService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/NoSearchService.java
index 058e7c5..24af7f0 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/NoSearchService.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/NoSearchService.java
@@ -32,7 +32,7 @@ public class NoSearchService extends SearchService {
@Inject
public NoSearchService() {
- super("NoSearchService-Thread");
+ super("NoSearchService");
}
@Override
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java
index 9ef0217..d17beea 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java
@@ -25,6 +25,8 @@ import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteEventAsyncListener;
import org.apache.zeppelin.notebook.Paragraph;
+import javax.annotation.PreDestroy;
+
/**
* Search (both, indexing and query) the notes.
*
@@ -96,6 +98,7 @@ public abstract class SearchService extends NoteEventAsyncListener {
/**
* Frees the recourses used by index
*/
+ @PreDestroy
public void close() {
super.close();
}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java
index dde2ceb..48d9e84 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java
@@ -21,14 +21,14 @@ import static org.apache.zeppelin.search.LuceneSearch.formatId;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import com.google.common.base.Splitter;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.List;
import java.util.Map;
-import com.google.common.io.Files;
+
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterSetting;
@@ -54,7 +54,7 @@ public class LuceneSearchTest {
@Before
public void startUp() throws IOException {
- indexDir = Files.createTempDir().getAbsoluteFile();
+ indexDir = Files.createTempDirectory("lucene").toFile();
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_SEARCH_INDEX_PATH.getVarName(), indexDir.getAbsolutePath());
noteSearchService = new LuceneSearch(ZeppelinConfiguration.create());
interpreterSettingManager = mock(InterpreterSettingManager.class);
@@ -62,9 +62,9 @@ public class LuceneSearchTest {
when(defaultInterpreterSetting.getName()).thenReturn("test");
when(interpreterSettingManager.getDefaultInterpreterSetting()).thenReturn(defaultInterpreterSetting);
notebook = new Notebook(ZeppelinConfiguration.create(), mock(AuthorizationService.class), mock(NotebookRepo.class), mock(NoteManager.class),
- mock(InterpreterFactory.class), interpreterSettingManager,
- noteSearchService,
- mock(Credentials.class), null);
+ mock(InterpreterFactory.class), interpreterSettingManager,
+ noteSearchService,
+ mock(Credentials.class), null);
}
@After
@@ -73,12 +73,19 @@ public class LuceneSearchTest {
indexDir.delete();
}
-// @Test
+ private void drainSearchEvents() throws InterruptedException {
+ while (!noteSearchService.isEventQueueEmpty()) {
+ Thread.sleep(1000);
+ }
+ Thread.sleep(1000);
+ }
+
+ @Test
public void canIndexAndQuery() throws IOException, InterruptedException {
// given
Note note1 = newNoteWithParagraph("Notebook1", "test");
Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all");
- noteSearchService.drainEvents();
+ drainSearchEvents();
// when
List<Map<String, String>> results = noteSearchService.query("all");
@@ -95,7 +102,7 @@ public class LuceneSearchTest {
// given
Note note1 = newNoteWithParagraph("Notebook1", "test");
Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all");
- noteSearchService.drainEvents();
+ drainSearchEvents();
// when
List<Map<String, String>> results = noteSearchService.query("Notebook1");
@@ -111,7 +118,7 @@ public class LuceneSearchTest {
// given
Note note1 = newNoteWithParagraph("Notebook1", "test", "testingTitleSearch");
Note note2 = newNoteWithParagraph("Notebook2", "not test", "notTestingTitleSearch");
- noteSearchService.drainEvents();
+ drainSearchEvents();
// when
List<Map<String, String>> results = noteSearchService.query("testingTitleSearch");
@@ -128,16 +135,15 @@ public class LuceneSearchTest {
assertThat(TitleHits).isAtLeast(1);
}
- //@Test
- public void indexKeyContract() throws IOException {
- // give
+ @Test
+ public void indexKeyContract() throws IOException, InterruptedException {
+ // given
Note note1 = newNoteWithParagraph("Notebook1", "test");
+ drainSearchEvents();
// when
- noteSearchService.addNoteIndex(note1);
- // then
String id = resultForQuery("test").get(0).get("id"); // LuceneSearch.ID_FIELD
-
- assertThat(Splitter.on("/").split(id)) // key structure <noteId>/paragraph/<paragraphId>
+ // then
+ assertThat(id.split("/")).asList() // key structure <noteId>/paragraph/<paragraphId>
.containsAllOf(
note1.getId(), "paragraph", note1.getLastParagraph().getId()); // LuceneSearch.PARAGRAPH
}
@@ -158,7 +164,7 @@ public class LuceneSearchTest {
// given
Note note1 = newNoteWithParagraph("Notebook1", "test");
Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all");
- noteSearchService.drainEvents();
+ drainSearchEvents();
// when
Paragraph p2 = note2.getLastParagraph();
@@ -187,7 +193,7 @@ public class LuceneSearchTest {
// given
Note note1 = newNoteWithParagraph("Notebook1", "test");
Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all");
- noteSearchService.drainEvents();
+ drainSearchEvents();
assertThat(resultForQuery("Notebook2")).isNotEmpty();
@@ -208,7 +214,7 @@ public class LuceneSearchTest {
// given: total 2 notebooks, 3 paragraphs
Note note1 = newNoteWithParagraph("Notebook1", "test");
Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all");
- noteSearchService.drainEvents();
+ drainSearchEvents();
assertThat(resultForQuery("test").size()).isEqualTo(3);
@@ -217,7 +223,7 @@ public class LuceneSearchTest {
p1.setText("no no no");
notebook.saveNote(note1, AuthenticationInfo.ANONYMOUS);
p1.getNote().fireParagraphUpdateEvent(p1);
- noteSearchService.drainEvents();
+ drainSearchEvents();
// then
assertThat(resultForQuery("Notebook1").size()).isEqualTo(1);
@@ -237,13 +243,13 @@ public class LuceneSearchTest {
// given: total 2 notebooks, 3 paragraphs
Note note1 = newNoteWithParagraph("Notebook1", "test");
Note note2 = newNoteWithParagraphs("Notebook2", "not test", "not test at all");
- noteSearchService.drainEvents();
+ drainSearchEvents();
assertThat(resultForQuery("test").size()).isEqualTo(3);
// when
note1.setName("NotebookN");
notebook.updateNote(note1, AuthenticationInfo.ANONYMOUS);
- noteSearchService.drainEvents();
+ drainSearchEvents();
Thread.sleep(1000);
// then
assertThat(resultForQuery("Notebook1")).isEmpty();
@@ -297,7 +303,6 @@ public class LuceneSearchTest {
}
private Note newNote(String name) throws IOException {
- Note note = notebook.createNote(name, AuthenticationInfo.ANONYMOUS);
- return note;
+ return notebook.createNote(name, AuthenticationInfo.ANONYMOUS);
}
}