You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/12/18 05:13:23 UTC
[zeppelin] branch master updated: [ZEPPELIN-5153]. Too many
unnecessary indexing in LuceneSearch
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new e95af9f [ZEPPELIN-5153]. Too many unnecessary indexing in LuceneSearch
e95af9f is described below
commit e95af9fffeb6981493de49afe8354bf4d8472ce0
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Dec 17 21:41:51 2020 +0800
[ZEPPELIN-5153]. Too many unnecessary indexing in LuceneSearch
### What is this PR for?
There's many unnecessary indexing in LuceneSearch which cause performance issue when there' lots of jobs running in zeppelin. This PR is to fix it.
* Only index the current paragraph when paragraph is updated.( All the paragraphs will be indexed
* Only index/update note name index when note is updated.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5153
### How should this be tested?
* Manually tesed
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3992 from zjffdu/ZEPPELIN-5153 and squashes the following commits:
6ec0627f4 [Jeff Zhang] Fix comment
76fa0ec18 [Jeff Zhang] [ZEPPELIN-5153]. Too many unnecessary indexing in LuceneSearch
---
.../apache/zeppelin/service/NotebookService.java | 2 +-
.../apache/zeppelin/rest/ZeppelinRestApiTest.java | 2 +-
.../zeppelin/notebook/NoteEventAsyncListener.java | 20 +++--
.../org/apache/zeppelin/notebook/Notebook.java | 4 +
.../org/apache/zeppelin/search/LuceneSearch.java | 96 ++++++++--------------
.../org/apache/zeppelin/search/SearchService.java | 66 +++++++--------
.../apache/zeppelin/search/LuceneSearchTest.java | 12 +--
7 files changed, 90 insertions(+), 112 deletions(-)
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index c6f4124..44477fb 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -746,7 +746,7 @@ public class NotebookService {
schedulerService.refreshCron(note.getId());
}
- notebook.saveNote(note, context.getAutheInfo());
+ notebook.updateNote(note, context.getAutheInfo());
callback.onSuccess(note, context);
}
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
index 073fe21..f032984 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
@@ -944,7 +944,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
"\"text\": \"ThisIsToTestSearchMethodWithTitle \"}";
CloseableHttpResponse postNoteText = httpPost("/notebook/" + note.getId() + "/paragraph", jsonRequest);
postNoteText.close();
- Thread.sleep(1000);
+ Thread.sleep(3000);
CloseableHttpResponse searchNote = httpGet("/notebook/search?q='testTitleSearchOfParagraph'");
Map<String, Object> respSearchResult = gson.fromJson(EntityUtils.toString(searchNote.getEntity(), StandardCharsets.UTF_8),
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
index b593673..97f799c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
@@ -19,6 +19,8 @@ package org.apache.zeppelin.notebook;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.user.AuthenticationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -28,6 +30,8 @@ import java.util.concurrent.LinkedBlockingQueue;
*/
public abstract class NoteEventAsyncListener implements NoteEventListener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(NoteEventAsyncListener.class);
+
private BlockingQueue<NoteEvent> eventsQueue = new LinkedBlockingQueue<>();
private Thread eventHandlerThread;
@@ -38,17 +42,17 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
this.eventHandlerThread.start();
}
- public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent);
+ public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) throws Exception;
- public abstract void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent);
+ public abstract void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent) throws Exception;
- public abstract void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent);
+ public abstract void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent) throws Exception;
- public abstract void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent);
+ public abstract void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent) throws Exception;
- public abstract void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent);
+ public abstract void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent) throws Exception;
- public abstract void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent);
+ public abstract void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) throws Exception;
public void close() {
@@ -112,8 +116,8 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
} else {
throw new RuntimeException("Unknown event: " + event.getClass().getSimpleName());
}
- } catch (InterruptedException e) {
- e.printStackTrace();
+ } catch (Exception e) {
+ LOGGER.error("Fail to handle NoteEvent", e);
}
}
}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 39dea80..94614bb 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -377,6 +377,10 @@ public class Notebook {
public void saveNote(Note note, AuthenticationInfo subject) throws IOException {
noteManager.saveNote(note, subject);
+ }
+
+ public void updateNote(Note note, AuthenticationInfo subject) throws IOException {
+ noteManager.saveNote(note, subject);
fireNoteUpdateEvent(note, subject);
}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
index f0ccce0..98456ce 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
@@ -22,12 +22,10 @@ import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.inject.Inject;
@@ -204,17 +202,14 @@ public class LuceneSearch extends SearchService {
* @see org.apache.zeppelin.search.Search#updateIndexDoc(org.apache.zeppelin.notebook.Note)
*/
@Override
- public void updateIndexDoc(Note note) throws IOException {
+ public void updateNoteIndex(Note note) throws IOException {
updateIndexNoteName(note);
- for (Paragraph p : note.getParagraphs()) {
- updateIndexParagraph(note, p);
- }
}
private void updateIndexNoteName(Note note) throws IOException {
String noteName = note.getName();
String noteId = note.getId();
- LOGGER.debug("Indexing Notebook {}, '{}'", noteId, noteName);
+ LOGGER.debug("Update note index: {}, '{}'", noteId, noteName);
if (null == noteName || noteName.isEmpty()) {
LOGGER.debug("Skipping empty notebook name");
return;
@@ -222,12 +217,10 @@ public class LuceneSearch extends SearchService {
updateDoc(noteId, noteName, null);
}
- private void updateIndexParagraph(Note note, Paragraph p) throws IOException {
- if (p.getText() == null) {
- LOGGER.debug("Skipping empty paragraph");
- return;
- }
- updateDoc(note.getId(), note.getName(), p);
+ @Override
+ public void updateParagraphIndex(Paragraph p) throws IOException {
+ LOGGER.debug("Update paragraph index: {}", p.getId());
+ updateDoc(p.getNote().getId(), p.getNote().getName(), p);
}
/**
@@ -288,7 +281,9 @@ public class LuceneSearch extends SearchService {
doc.add(new StringField("title", noteName, Field.Store.YES));
if (null != p) {
- doc.add(new TextField(SEARCH_FIELD_TEXT, p.getText(), Field.Store.YES));
+ if (p.getText() != null) {
+ doc.add(new TextField(SEARCH_FIELD_TEXT, p.getText(), Field.Store.YES));
+ }
if (p.getTitle() != null) {
doc.add(new TextField(SEARCH_FIELD_TITLE, p.getTitle(), Field.Store.YES));
}
@@ -301,38 +296,10 @@ public class LuceneSearch extends SearchService {
}
/* (non-Javadoc)
- * @see org.apache.zeppelin.search.Search#addIndexDocs(java.util.Collection)
- */
- @Override
- public void addIndexDocs(Collection<Note> collection) {
- int docsIndexed = 0;
- long start = System.nanoTime();
- try {
- for (Note note : collection) {
- addIndexDocAsync(note);
- docsIndexed++;
- }
- } catch (IOException e) {
- LOGGER.error("Failed to index all Notebooks", e);
- } finally {
- try { // save what's been indexed, even if not full collection
- indexWriter.commit();
- } catch (IOException e) {
- LOGGER.error("Failed to save index", e);
- }
- long end = System.nanoTime();
- LOGGER.info(
- "Indexing {} notebooks took {}ms",
- docsIndexed,
- TimeUnit.NANOSECONDS.toMillis(end - start));
- }
- }
-
- /* (non-Javadoc)
* @see org.apache.zeppelin.search.Search#addIndexDoc(org.apache.zeppelin.notebook.Note)
*/
@Override
- public void addIndexDoc(Note note) {
+ public void addNoteIndex(Note note) {
try {
addIndexDocAsync(note);
indexWriter.commit();
@@ -341,6 +308,11 @@ public class LuceneSearch extends SearchService {
}
}
+ @Override
+ public void addParagraphIndex(Paragraph pararaph) throws IOException {
+ updateDoc(pararaph.getNote().getId(), pararaph.getNote().getName(), pararaph);
+ }
+
/**
* Indexes the given notebook, but does not commit changes.
*
@@ -349,12 +321,8 @@ public class LuceneSearch extends SearchService {
*/
private void addIndexDocAsync(Note note) throws IOException {
indexNoteName(indexWriter, note.getId(), note.getName());
- for (Paragraph doc : note.getParagraphs()) {
- if (doc.getText() == null) {
- LOGGER.debug("Skipping empty paragraph");
- continue;
- }
- indexDoc(indexWriter, note.getId(), note.getName(), doc);
+ for (Paragraph paragraph : note.getParagraphs()) {
+ updateDoc(note.getId(), note.getName(), paragraph);
}
}
@@ -362,8 +330,14 @@ public class LuceneSearch extends SearchService {
* @see org.apache.zeppelin.search.Search#deleteIndexDocs(org.apache.zeppelin.notebook.Note)
*/
@Override
- public void deleteIndexDocs(String noteId) {
- deleteDoc(noteId, null);
+ public void deleteNoteIndex(Note note) {
+ if (note == null) {
+ return;
+ }
+ deleteDoc(note.getId(), null);
+ for (Paragraph paragraph : note.getParagraphs()) {
+ deleteParagraphIndex(note.getId(), paragraph);
+ }
}
/* (non-Javadoc)
@@ -371,10 +345,16 @@ public class LuceneSearch extends SearchService {
* #deleteIndexDoc(org.apache.zeppelin.notebook.Note, org.apache.zeppelin.notebook.Paragraph)
*/
@Override
- public void deleteIndexDoc(String noteId, Paragraph p) {
+ public void deleteParagraphIndex(String noteId, Paragraph p) {
deleteDoc(noteId, p);
}
+ /**
+ * Delete note index of paragraph index (when p is not null).
+ *
+ * @param noteId
+ * @param p
+ */
private void deleteDoc(String noteId, Paragraph p) {
String fullNoteOrJustParagraph = formatDeleteId(noteId, p);
LOGGER.debug("Deleting note {}, out of: {}", noteId, indexWriter.numDocs());
@@ -410,15 +390,7 @@ public class LuceneSearch extends SearchService {
LOGGER.debug("Skipping empty notebook name");
return;
}
- indexDoc(w, noteId, noteName, null);
- }
-
- /** Indexes a single document: - code of the paragraph (if non-null) - or just a note name */
- private void indexDoc(IndexWriter w, String noteId, String noteName, Paragraph p)
- throws IOException {
- String id = formatId(noteId, p);
- Document doc = newDocument(id, noteName, p);
- w.addDocument(doc);
+ updateDoc(noteId, noteName, null);
}
@Override
@@ -426,7 +398,7 @@ public class LuceneSearch extends SearchService {
Thread thread = new Thread(() -> {
LOGGER.info("Starting rebuild index");
notes.forEach(note -> {
- addIndexDoc(note);
+ addNoteIndex(note);
note.unLoad();
});
LOGGER.info("Finish rebuild index");
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java
index bb4c189..9ef0217 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java
@@ -17,7 +17,6 @@
package org.apache.zeppelin.search;
import java.io.IOException;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@@ -48,33 +47,42 @@ public abstract class SearchService extends NoteEventAsyncListener {
public abstract List<Map<String, String>> query(String queryStr);
/**
- * Updates all documents in index for the given note:
- * - name
- * - all paragraphs
+ * Updates note index for the given note, only update index of note meta info,
+ * such as id,name. Paragraph index will be done in method updateParagraphIndex.
*
* @param note a Note to update index for
* @throws IOException
*/
- public abstract void updateIndexDoc(Note note) throws IOException;
+ public abstract void updateNoteIndex(Note note) throws IOException;
/**
- * Indexes full collection of notes: all the paragraphs + Note names
+ * Updates paragraph index for the given paragraph.
*
- * @param collection of Notes
+ * @param paragraph a Paragraph to update index for
+ * @throws IOException
*/
- public abstract void addIndexDocs(Collection<Note> collection);
+
+ public abstract void updateParagraphIndex(Paragraph paragraph) throws IOException;
/**
* Indexes the given note.
*
* @throws IOException If there is a low-level I/O error
*/
- public abstract void addIndexDoc(Note note);
+ public abstract void addNoteIndex(Note note) throws IOException;
+
+ /**
+ * Indexes the given paragraph.
+ *
+ * @throws IOException If there is a low-level I/O error
+ */
+ public abstract void addParagraphIndex(Paragraph pargaraph) throws IOException;
+
/**
* Deletes all docs on given Note from index
*/
- public abstract void deleteIndexDocs(String noteId);
+ public abstract void deleteNoteIndex(Note note) throws IOException;
/**
* Deletes doc for a given
@@ -83,7 +91,7 @@ public abstract class SearchService extends NoteEventAsyncListener {
* @param p
* @throws IOException
*/
- public abstract void deleteIndexDoc(String noteId, Paragraph p);
+ public abstract void deleteParagraphIndex(String noteId, Paragraph p) throws IOException;
/**
* Frees the recourses used by index
@@ -93,46 +101,34 @@ public abstract class SearchService extends NoteEventAsyncListener {
}
@Override
- public void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) {
- addIndexDoc(noteCreateEvent.getNote());
+ public void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) throws Exception {
+ addNoteIndex(noteCreateEvent.getNote());
}
@Override
- public void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent) {
- deleteIndexDocs(noteRemoveEvent.getNote().getId());
+ public void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent) throws Exception {
+ deleteNoteIndex(noteRemoveEvent.getNote());
}
@Override
- public void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent) {
- try {
- updateIndexDoc(noteUpdateEvent.getNote());
- } catch (IOException e) {
- e.printStackTrace();
- }
+ public void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent) throws Exception {
+ updateNoteIndex(noteUpdateEvent.getNote());
}
@Override
- public void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent) {
- try {
- updateIndexDoc(paragraphCreateEvent.getParagraph().getNote());
- } catch (IOException e) {
- e.printStackTrace();
- }
+ public void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent) throws Exception {
+ addParagraphIndex(paragraphCreateEvent.getParagraph());
}
@Override
- public void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent) {
+ public void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent) throws Exception {
Paragraph p = paragraphRemoveEvent.getParagraph();
- deleteIndexDoc(p.getNote().getId(), p);
+ deleteParagraphIndex(p.getNote().getId(), p);
}
@Override
- public void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) {
- try {
- updateIndexDoc(paragraphUpdateEvent.getParagraph().getNote());
- } catch (IOException e) {
- e.printStackTrace();
- }
+ public void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) throws Exception {
+ updateParagraphIndex(paragraphUpdateEvent.getParagraph());
}
public abstract void startRebuildIndex(Stream<Note> notes);
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java
index 7857021..dde2ceb 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java
@@ -133,7 +133,7 @@ public class LuceneSearchTest {
// give
Note note1 = newNoteWithParagraph("Notebook1", "test");
// when
- noteSearchService.addIndexDoc(note1);
+ noteSearchService.addNoteIndex(note1);
// then
String id = resultForQuery("test").get(0).get("id"); // LuceneSearch.ID_FIELD
@@ -163,7 +163,8 @@ public class LuceneSearchTest {
// when
Paragraph p2 = note2.getLastParagraph();
p2.setText("test indeed");
- noteSearchService.updateIndexDoc(note2);
+ noteSearchService.updateNoteIndex(note2);
+ noteSearchService.updateParagraphIndex(p2);
// then
List<Map<String, String>> results = noteSearchService.query("all");
@@ -178,7 +179,7 @@ public class LuceneSearchTest {
// give
// looks like a bug in web UI: it tries to delete a note twice (after it has just been deleted)
// when
- noteSearchService.deleteIndexDocs(null);
+ noteSearchService.deleteNoteIndex(null);
}
@Test
@@ -191,7 +192,7 @@ public class LuceneSearchTest {
assertThat(resultForQuery("Notebook2")).isNotEmpty();
// when
- noteSearchService.deleteIndexDocs(note2.getId());
+ noteSearchService.deleteNoteIndex(note2);
// then
assertThat(noteSearchService.query("all")).isEmpty();
@@ -215,6 +216,7 @@ public class LuceneSearchTest {
Paragraph p1 = note1.getLastParagraph();
p1.setText("no no no");
notebook.saveNote(note1, AuthenticationInfo.ANONYMOUS);
+ p1.getNote().fireParagraphUpdateEvent(p1);
noteSearchService.drainEvents();
// then
@@ -240,7 +242,7 @@ public class LuceneSearchTest {
// when
note1.setName("NotebookN");
- notebook.saveNote(note1, AuthenticationInfo.ANONYMOUS);
+ notebook.updateNote(note1, AuthenticationInfo.ANONYMOUS);
noteSearchService.drainEvents();
Thread.sleep(1000);
// then