You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/12/18 05:13:23 UTC

[zeppelin] branch master updated: [ZEPPELIN-5153]. Too many unnecessary indexing in LuceneSearch

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new e95af9f  [ZEPPELIN-5153]. Too many unnecessary indexing in LuceneSearch
e95af9f is described below

commit e95af9fffeb6981493de49afe8354bf4d8472ce0
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Dec 17 21:41:51 2020 +0800

    [ZEPPELIN-5153]. Too many unnecessary indexing in LuceneSearch
    
    ### What is this PR for?
    
    There's many unnecessary indexing in LuceneSearch which cause performance issue when there' lots of jobs running in zeppelin. This PR is to fix it.
    * Only index the current paragraph when paragraph is updated.( All the paragraphs will be indexed
    * Only index/update note name index when note is updated.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5153
    
    ### How should this be tested?
    * Manually tesed
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3992 from zjffdu/ZEPPELIN-5153 and squashes the following commits:
    
    6ec0627f4 [Jeff Zhang] Fix comment
    76fa0ec18 [Jeff Zhang] [ZEPPELIN-5153]. Too many unnecessary indexing in LuceneSearch
---
 .../apache/zeppelin/service/NotebookService.java   |  2 +-
 .../apache/zeppelin/rest/ZeppelinRestApiTest.java  |  2 +-
 .../zeppelin/notebook/NoteEventAsyncListener.java  | 20 +++--
 .../org/apache/zeppelin/notebook/Notebook.java     |  4 +
 .../org/apache/zeppelin/search/LuceneSearch.java   | 96 ++++++++--------------
 .../org/apache/zeppelin/search/SearchService.java  | 66 +++++++--------
 .../apache/zeppelin/search/LuceneSearchTest.java   | 12 +--
 7 files changed, 90 insertions(+), 112 deletions(-)

diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index c6f4124..44477fb 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -746,7 +746,7 @@ public class NotebookService {
       schedulerService.refreshCron(note.getId());
     }
 
-    notebook.saveNote(note, context.getAutheInfo());
+    notebook.updateNote(note, context.getAutheInfo());
     callback.onSuccess(note, context);
   }
 
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
index 073fe21..f032984 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java
@@ -944,7 +944,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi {
               "\"text\": \"ThisIsToTestSearchMethodWithTitle \"}";
       CloseableHttpResponse postNoteText = httpPost("/notebook/" + note.getId() + "/paragraph", jsonRequest);
       postNoteText.close();
-      Thread.sleep(1000);
+      Thread.sleep(3000);
 
       CloseableHttpResponse searchNote = httpGet("/notebook/search?q='testTitleSearchOfParagraph'");
       Map<String, Object> respSearchResult = gson.fromJson(EntityUtils.toString(searchNote.getEntity(), StandardCharsets.UTF_8),
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
index b593673..97f799c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventAsyncListener.java
@@ -19,6 +19,8 @@ package org.apache.zeppelin.notebook;
 
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.user.AuthenticationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -28,6 +30,8 @@ import java.util.concurrent.LinkedBlockingQueue;
  */
 public abstract class NoteEventAsyncListener implements NoteEventListener {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(NoteEventAsyncListener.class);
+
   private BlockingQueue<NoteEvent> eventsQueue = new LinkedBlockingQueue<>();
 
   private Thread eventHandlerThread;
@@ -38,17 +42,17 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
     this.eventHandlerThread.start();
   }
 
-  public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent);
+  public abstract void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) throws Exception;
 
-  public abstract void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent);
+  public abstract void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent) throws Exception;
 
-  public abstract void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent);
+  public abstract void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent) throws Exception;
 
-  public abstract void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent);
+  public abstract void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent) throws Exception;
 
-  public abstract void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent);
+  public abstract void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent) throws Exception;
 
-  public abstract void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent);
+  public abstract void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) throws Exception;
 
 
   public void close() {
@@ -112,8 +116,8 @@ public abstract class NoteEventAsyncListener implements NoteEventListener {
           } else {
             throw new RuntimeException("Unknown event: " + event.getClass().getSimpleName());
           }
-        } catch (InterruptedException e) {
-          e.printStackTrace();
+        } catch (Exception e) {
+          LOGGER.error("Fail to handle NoteEvent", e);
         }
       }
     }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 39dea80..94614bb 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -377,6 +377,10 @@ public class Notebook {
 
   public void saveNote(Note note, AuthenticationInfo subject) throws IOException {
     noteManager.saveNote(note, subject);
+  }
+
+  public void updateNote(Note note, AuthenticationInfo subject) throws IOException {
+    noteManager.saveNote(note, subject);
     fireNoteUpdateEvent(note, subject);
   }
 
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
index f0ccce0..98456ce 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java
@@ -22,12 +22,10 @@ import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import javax.inject.Inject;
 
@@ -204,17 +202,14 @@ public class LuceneSearch extends SearchService {
    * @see org.apache.zeppelin.search.Search#updateIndexDoc(org.apache.zeppelin.notebook.Note)
    */
   @Override
-  public void updateIndexDoc(Note note) throws IOException {
+  public void updateNoteIndex(Note note) throws IOException {
     updateIndexNoteName(note);
-    for (Paragraph p : note.getParagraphs()) {
-      updateIndexParagraph(note, p);
-    }
   }
 
   private void updateIndexNoteName(Note note) throws IOException {
     String noteName = note.getName();
     String noteId = note.getId();
-    LOGGER.debug("Indexing Notebook {}, '{}'", noteId, noteName);
+    LOGGER.debug("Update note index: {}, '{}'", noteId, noteName);
     if (null == noteName || noteName.isEmpty()) {
       LOGGER.debug("Skipping empty notebook name");
       return;
@@ -222,12 +217,10 @@ public class LuceneSearch extends SearchService {
     updateDoc(noteId, noteName, null);
   }
 
-  private void updateIndexParagraph(Note note, Paragraph p) throws IOException {
-    if (p.getText() == null) {
-      LOGGER.debug("Skipping empty paragraph");
-      return;
-    }
-    updateDoc(note.getId(), note.getName(), p);
+  @Override
+  public void updateParagraphIndex(Paragraph p) throws IOException {
+    LOGGER.debug("Update paragraph index: {}", p.getId());
+    updateDoc(p.getNote().getId(), p.getNote().getName(), p);
   }
 
   /**
@@ -288,7 +281,9 @@ public class LuceneSearch extends SearchService {
     doc.add(new StringField("title", noteName, Field.Store.YES));
 
     if (null != p) {
-      doc.add(new TextField(SEARCH_FIELD_TEXT, p.getText(), Field.Store.YES));
+      if (p.getText() != null) {
+        doc.add(new TextField(SEARCH_FIELD_TEXT, p.getText(), Field.Store.YES));
+      }
       if (p.getTitle() != null) {
         doc.add(new TextField(SEARCH_FIELD_TITLE, p.getTitle(), Field.Store.YES));
       }
@@ -301,38 +296,10 @@ public class LuceneSearch extends SearchService {
   }
 
   /* (non-Javadoc)
-   * @see org.apache.zeppelin.search.Search#addIndexDocs(java.util.Collection)
-   */
-  @Override
-  public void addIndexDocs(Collection<Note> collection) {
-    int docsIndexed = 0;
-    long start = System.nanoTime();
-    try {
-      for (Note note : collection) {
-        addIndexDocAsync(note);
-        docsIndexed++;
-      }
-    } catch (IOException e) {
-      LOGGER.error("Failed to index all Notebooks", e);
-    } finally {
-      try { // save what's been indexed, even if not full collection
-        indexWriter.commit();
-      } catch (IOException e) {
-        LOGGER.error("Failed to save index", e);
-      }
-      long end = System.nanoTime();
-      LOGGER.info(
-          "Indexing {} notebooks took {}ms",
-          docsIndexed,
-          TimeUnit.NANOSECONDS.toMillis(end - start));
-    }
-  }
-
-  /* (non-Javadoc)
    * @see org.apache.zeppelin.search.Search#addIndexDoc(org.apache.zeppelin.notebook.Note)
    */
   @Override
-  public void addIndexDoc(Note note) {
+  public void addNoteIndex(Note note) {
     try {
       addIndexDocAsync(note);
       indexWriter.commit();
@@ -341,6 +308,11 @@ public class LuceneSearch extends SearchService {
     }
   }
 
+  @Override
+  public void addParagraphIndex(Paragraph pararaph) throws IOException {
+    updateDoc(pararaph.getNote().getId(), pararaph.getNote().getName(), pararaph);
+  }
+
   /**
    * Indexes the given notebook, but does not commit changes.
    *
@@ -349,12 +321,8 @@ public class LuceneSearch extends SearchService {
    */
   private void addIndexDocAsync(Note note) throws IOException {
     indexNoteName(indexWriter, note.getId(), note.getName());
-    for (Paragraph doc : note.getParagraphs()) {
-      if (doc.getText() == null) {
-        LOGGER.debug("Skipping empty paragraph");
-        continue;
-      }
-      indexDoc(indexWriter, note.getId(), note.getName(), doc);
+    for (Paragraph paragraph : note.getParagraphs()) {
+      updateDoc(note.getId(), note.getName(), paragraph);
     }
   }
 
@@ -362,8 +330,14 @@ public class LuceneSearch extends SearchService {
    * @see org.apache.zeppelin.search.Search#deleteIndexDocs(org.apache.zeppelin.notebook.Note)
    */
   @Override
-  public void deleteIndexDocs(String noteId) {
-    deleteDoc(noteId, null);
+  public void deleteNoteIndex(Note note) {
+    if (note == null) {
+      return;
+    }
+    deleteDoc(note.getId(), null);
+    for (Paragraph paragraph : note.getParagraphs()) {
+      deleteParagraphIndex(note.getId(), paragraph);
+    }
   }
 
   /* (non-Javadoc)
@@ -371,10 +345,16 @@ public class LuceneSearch extends SearchService {
    *  #deleteIndexDoc(org.apache.zeppelin.notebook.Note, org.apache.zeppelin.notebook.Paragraph)
    */
   @Override
-  public void deleteIndexDoc(String noteId, Paragraph p) {
+  public void deleteParagraphIndex(String noteId, Paragraph p) {
     deleteDoc(noteId, p);
   }
 
+  /**
+   * Delete note index of paragraph index (when p is not null).
+   *
+   * @param noteId
+   * @param p
+   */
   private void deleteDoc(String noteId, Paragraph p) {
     String fullNoteOrJustParagraph = formatDeleteId(noteId, p);
     LOGGER.debug("Deleting note {}, out of: {}", noteId, indexWriter.numDocs());
@@ -410,15 +390,7 @@ public class LuceneSearch extends SearchService {
       LOGGER.debug("Skipping empty notebook name");
       return;
     }
-    indexDoc(w, noteId, noteName, null);
-  }
-
-  /** Indexes a single document: - code of the paragraph (if non-null) - or just a note name */
-  private void indexDoc(IndexWriter w, String noteId, String noteName, Paragraph p)
-      throws IOException {
-    String id = formatId(noteId, p);
-    Document doc = newDocument(id, noteName, p);
-    w.addDocument(doc);
+    updateDoc(noteId, noteName, null);
   }
 
   @Override
@@ -426,7 +398,7 @@ public class LuceneSearch extends SearchService {
     Thread thread = new Thread(() -> {
       LOGGER.info("Starting rebuild index");
       notes.forEach(note -> {
-        addIndexDoc(note);
+        addNoteIndex(note);
         note.unLoad();
       });
       LOGGER.info("Finish rebuild index");
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java
index bb4c189..9ef0217 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java
@@ -17,7 +17,6 @@
 package org.apache.zeppelin.search;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -48,33 +47,42 @@ public abstract class SearchService extends NoteEventAsyncListener {
   public abstract List<Map<String, String>> query(String queryStr);
 
   /**
-   * Updates all documents in index for the given note:
-   *  - name
-   *  - all paragraphs
+   * Updates note index for the given note, only update index of note meta info,
+   * such as id,name. Paragraph index will be done in method updateParagraphIndex.
    *
    * @param note a Note to update index for
    * @throws IOException
    */
-  public abstract void updateIndexDoc(Note note) throws IOException;
+  public abstract void updateNoteIndex(Note note) throws IOException;
 
   /**
-   * Indexes full collection of notes: all the paragraphs + Note names
+   * Updates paragraph index for the given paragraph.
    *
-   * @param collection of Notes
+   * @param paragraph a Paragraph to update index for
+   * @throws IOException
    */
-  public abstract void addIndexDocs(Collection<Note> collection);
+
+  public abstract void updateParagraphIndex(Paragraph paragraph) throws IOException;
 
   /**
    * Indexes the given note.
    *
    * @throws IOException If there is a low-level I/O error
    */
-  public abstract void addIndexDoc(Note note);
+  public abstract void addNoteIndex(Note note) throws IOException;
+
+  /**
+   * Indexes the given paragraph.
+   *
+   * @throws IOException If there is a low-level I/O error
+   */
+  public abstract void addParagraphIndex(Paragraph pargaraph) throws IOException;
+
 
   /**
    * Deletes all docs on given Note from index
    */
-  public abstract void deleteIndexDocs(String noteId);
+  public abstract void deleteNoteIndex(Note note) throws IOException;
 
   /**
    * Deletes doc for a given
@@ -83,7 +91,7 @@ public abstract class SearchService extends NoteEventAsyncListener {
    * @param p
    * @throws IOException
    */
-  public abstract void deleteIndexDoc(String noteId, Paragraph p);
+  public abstract void deleteParagraphIndex(String noteId, Paragraph p) throws IOException;
 
   /**
    * Frees the recourses used by index
@@ -93,46 +101,34 @@ public abstract class SearchService extends NoteEventAsyncListener {
   }
 
   @Override
-  public void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) {
-    addIndexDoc(noteCreateEvent.getNote());
+  public void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) throws Exception {
+    addNoteIndex(noteCreateEvent.getNote());
   }
 
   @Override
-  public void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent) {
-    deleteIndexDocs(noteRemoveEvent.getNote().getId());
+  public void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent) throws Exception {
+    deleteNoteIndex(noteRemoveEvent.getNote());
   }
 
   @Override
-  public void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent) {
-    try {
-      updateIndexDoc(noteUpdateEvent.getNote());
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
+  public void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent) throws Exception {
+    updateNoteIndex(noteUpdateEvent.getNote());
   }
 
   @Override
-  public void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent) {
-    try {
-      updateIndexDoc(paragraphCreateEvent.getParagraph().getNote());
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
+  public void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent) throws Exception {
+    addParagraphIndex(paragraphCreateEvent.getParagraph());
   }
 
   @Override
-  public void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent) {
+  public void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent) throws Exception {
     Paragraph p = paragraphRemoveEvent.getParagraph();
-    deleteIndexDoc(p.getNote().getId(), p);
+    deleteParagraphIndex(p.getNote().getId(), p);
   }
 
   @Override
-  public void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) {
-    try {
-      updateIndexDoc(paragraphUpdateEvent.getParagraph().getNote());
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
+  public void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) throws Exception {
+    updateParagraphIndex(paragraphUpdateEvent.getParagraph());
   }
 
   public abstract void startRebuildIndex(Stream<Note> notes);
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java
index 7857021..dde2ceb 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java
@@ -133,7 +133,7 @@ public class LuceneSearchTest {
     // give
     Note note1 = newNoteWithParagraph("Notebook1", "test");
     // when
-    noteSearchService.addIndexDoc(note1);
+    noteSearchService.addNoteIndex(note1);
     // then
     String id = resultForQuery("test").get(0).get("id"); // LuceneSearch.ID_FIELD
 
@@ -163,7 +163,8 @@ public class LuceneSearchTest {
     // when
     Paragraph p2 = note2.getLastParagraph();
     p2.setText("test indeed");
-    noteSearchService.updateIndexDoc(note2);
+    noteSearchService.updateNoteIndex(note2);
+    noteSearchService.updateParagraphIndex(p2);
 
     // then
     List<Map<String, String>> results = noteSearchService.query("all");
@@ -178,7 +179,7 @@ public class LuceneSearchTest {
     // give
     // looks like a bug in web UI: it tries to delete a note twice (after it has just been deleted)
     // when
-    noteSearchService.deleteIndexDocs(null);
+    noteSearchService.deleteNoteIndex(null);
   }
 
   @Test
@@ -191,7 +192,7 @@ public class LuceneSearchTest {
     assertThat(resultForQuery("Notebook2")).isNotEmpty();
 
     // when
-    noteSearchService.deleteIndexDocs(note2.getId());
+    noteSearchService.deleteNoteIndex(note2);
 
     // then
     assertThat(noteSearchService.query("all")).isEmpty();
@@ -215,6 +216,7 @@ public class LuceneSearchTest {
     Paragraph p1 = note1.getLastParagraph();
     p1.setText("no no no");
     notebook.saveNote(note1, AuthenticationInfo.ANONYMOUS);
+    p1.getNote().fireParagraphUpdateEvent(p1);
     noteSearchService.drainEvents();
 
     // then
@@ -240,7 +242,7 @@ public class LuceneSearchTest {
 
     // when
     note1.setName("NotebookN");
-    notebook.saveNote(note1, AuthenticationInfo.ANONYMOUS);
+    notebook.updateNote(note1, AuthenticationInfo.ANONYMOUS);
     noteSearchService.drainEvents();
     Thread.sleep(1000);
     // then