You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:34 UTC

[09/50] [abbrv] carbondata git commit: [CARBONDATA-2433] [Lucene GC Issue] Executor OOM because of GC when blocklet pruning is done using Lucene datamap

[CARBONDATA-2433] [Lucene GC Issue] Executor OOM because of GC when blocklet pruning is done using Lucene datamap

Problem
Executor OOM because of GC when blocklet pruning is done using Lucene datamap

Analysis
While seraching using lucene it creates a PriorityQueue to hold the documents. As size is not specified by default the PriorityQueue size is
equal to the number of lucene documents. As the docuemnts start getting added to the heap the GC time increases and after some time task fails due
to excessive GC and executor OOM occurs.
Reference blog: http://lucene.472066.n3.nabble.com/Optimization-of-memory-usage-in-PriorityQueue-td590355.html

Fix
Specify the limit for first search and after that use the searchAfter API to search in incremental order with gieven PriorityQueue size.

This closes #2267


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0e011977
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0e011977
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0e011977

Branch: refs/heads/spark-2.3
Commit: 0e011977e5562f4b033f503bb53a845616523dc7
Parents: 061871e
Author: manishgupta88 <to...@gmail.com>
Authored: Thu May 3 20:40:41 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon May 21 19:12:54 2018 +0530

----------------------------------------------------------------------
 .../datamap/lucene/LuceneFineGrainDataMap.java  | 65 +++++++++++++++-----
 1 file changed, 50 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e011977/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
index 3645bb6..742f8d0 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -68,6 +68,15 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
       LogServiceFactory.getLogService(LuceneFineGrainDataMap.class.getName());
 
   /**
+   * search limit will help in deciding the size of priority queue which is used by lucene to store
+   * the documents in heap. By default it is 100 means in one search max of 10 documents can be
+   * stored in heap by lucene. This way it will help in reducing the GC.
+   * Note: If it is removed or it's value is increased it will lead to almost 90%
+   * of the query time in GC in worst case scenarios if it's value is increased beyond a limit
+   */
+  private static final int SEARCH_LIMIT = 100;
+
+  /**
    * searcher object for this datamap
    */
   private Map<String, IndexSearcher> indexSearcherMap = null;
@@ -232,13 +241,23 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
     // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
     Map<String, Map<Integer, List<Short>>> mapBlocks = new HashMap<>();
 
+    long luceneSearchStartTime = System.currentTimeMillis();
     for (Map.Entry<String, IndexSearcher> searcherEntry : indexSearcherMap.entrySet()) {
       IndexSearcher indexSearcher = searcherEntry.getValue();
+      // take the min of total documents available in the reader and limit if set by the user
+      maxDocs = Math.min(maxDocs, indexSearcher.getIndexReader().maxDoc());
       // execute index search
       // initialize to null, else ScoreDoc objects will get accumulated in memory
       TopDocs result = null;
+      // the number of documents to be queried in one search. It will always be minimum of
+      // search result and maxDocs
+      int numberOfDocumentsToBeQueried = 0;
+      // counter for maintaining the total number of documents finished querying
+      int documentHitCounter = 0;
       try {
-        result = indexSearcher.search(query, maxDocs);
+        numberOfDocumentsToBeQueried = Math.min(maxDocs, SEARCH_LIMIT);
+        result = indexSearcher.search(query, numberOfDocumentsToBeQueried);
+        documentHitCounter += numberOfDocumentsToBeQueried;
       } catch (IOException e) {
         String errorMessage =
             String.format("failed to search lucene data, detail is %s", e.getMessage());
@@ -247,22 +266,38 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
       }
 
       ByteBuffer intBuffer = ByteBuffer.allocate(4);
-
-      for (ScoreDoc scoreDoc : result.scoreDocs) {
-        // get a document
-        Document doc = indexSearcher.doc(scoreDoc.doc);
-
-        // get all fields
-        List<IndexableField> fieldsInDoc = doc.getFields();
-        if (writeCacheSize > 0) {
-          // It fills rowids to the map, its value is combined with multiple rows.
-          fillMapForCombineRows(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey());
-        } else {
-          // Fill rowids to the map
-          fillMap(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey());
+      // last scoreDoc in a result to be used in searchAfter API
+      ScoreDoc lastScoreDoc = null;
+      while (true) {
+        for (ScoreDoc scoreDoc : result.scoreDocs) {
+          // get a document
+          Document doc = indexSearcher.doc(scoreDoc.doc);
+          // get all fields
+          List<IndexableField> fieldsInDoc = doc.getFields();
+          if (writeCacheSize > 0) {
+            // It fills rowids to the map, its value is combined with multiple rows.
+            fillMapForCombineRows(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey());
+          } else {
+            // Fill rowids to the map
+            fillMap(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey());
+          }
+          lastScoreDoc = scoreDoc;
+        }
+        // result will have the total number of hits therefore we always need to query on the
+        // left over documents
+        int remainingHits = result.totalHits - documentHitCounter;
+        // break the loop if count reaches maxDocs to be searched or remaining hits become <=0
+        if (remainingHits <= 0 || documentHitCounter >= maxDocs) {
+          break;
         }
+        numberOfDocumentsToBeQueried = Math.min(remainingHits, SEARCH_LIMIT);
+        result = indexSearcher.searchAfter(lastScoreDoc, query, numberOfDocumentsToBeQueried);
+        documentHitCounter += numberOfDocumentsToBeQueried;
       }
     }
+    LOGGER.info(
+        "Time taken for lucene search: " + (System.currentTimeMillis() - luceneSearchStartTime)
+            + " ms");
 
     // result blocklets
     List<FineGrainBlocklet> blocklets = new ArrayList<>();
@@ -388,4 +423,4 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
 
   }
 
-}
+}
\ No newline at end of file