You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/02 20:45:39 UTC

[incubator-pinot] branch master updated: Generate inverted index in purge task if it exists (#4182)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e1e54cf  Generate inverted index in purge task if it exists (#4182)
e1e54cf is described below

commit e1e54cf9e6365641d0e08c87af22b3360938c102
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu May 2 13:45:28 2019 -0700

    Generate inverted index in purge task if it exists (#4182)
    
    If purge task does not generate inverted index, then server needs
    to generate it while loading the segment, which can potentailly
    cause performance issue.
    
    TODO: uniform the behavior of Pinot Hadoop, segment converter and
    purger
---
 .../apache/pinot/core/minion/SegmentPurger.java    | 30 ++++++++---
 .../pinot/core/minion/SegmentPurgerTest.java       | 61 +++++++++++++---------
 2 files changed, 59 insertions(+), 32 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 9581310..3e95b7e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -21,12 +21,14 @@ package org.apache.pinot.core.minion;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.data.StarTreeIndexSpec;
-import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.segment.StarTreeMetadata;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
@@ -34,6 +36,8 @@ import org.apache.pinot.core.data.readers.RecordReader;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +69,7 @@ public class SegmentPurger {
 
   public File purgeSegment()
       throws Exception {
-    SegmentMetadata segmentMetadata = new SegmentMetadataImpl(_originalIndexDir);
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_originalIndexDir);
     String tableName = segmentMetadata.getTableName();
     String segmentName = segmentMetadata.getName();
     LOGGER.info("Start purging table: {}, segment: {}", tableName, segmentName);
@@ -81,7 +85,8 @@ public class SegmentPurger {
         return null;
       }
 
-      SegmentGeneratorConfig config = new SegmentGeneratorConfig(purgeRecordReader.getSchema());
+      Schema schema = purgeRecordReader.getSchema();
+      SegmentGeneratorConfig config = new SegmentGeneratorConfig(schema);
       config.setOutDir(_workingDir.getPath());
       config.setTableName(tableName);
       config.setSegmentName(segmentName);
@@ -100,14 +105,28 @@ public class SegmentPurger {
         config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
       }
 
+      // Generate inverted index if it exists in the original segment
+      // TODO: once the column metadata correctly reflects whether inverted index exists for the column, use that
+      //       instead of reading the segment
+      // TODO: uniform the behavior of Pinot Hadoop segment generation, segment converter and purger
+      List<String> invertedIndexCreationColumns = new ArrayList<>();
+      try (SegmentDirectory segmentDirectory = SegmentDirectory
+          .createFromLocalFS(_originalIndexDir, segmentMetadata, ReadMode.mmap);
+          SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+        for (String column : schema.getColumnNames()) {
+          if (reader.hasIndexFor(column, ColumnIndexType.INVERTED_INDEX)) {
+            invertedIndexCreationColumns.add(column);
+          }
+        }
+      }
+      config.setInvertedIndexCreationColumns(invertedIndexCreationColumns);
+
       // Generate star-tree if it exists in the original segment
       StarTreeMetadata starTreeMetadata = segmentMetadata.getStarTreeMetadata();
       if (starTreeMetadata != null) {
         config.enableStarTreeIndex(StarTreeIndexSpec.fromStarTreeMetadata(starTreeMetadata));
       }
 
-      // TODO: currently we don't generate inverted index
-
       SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
       purgeRecordReader.rewind();
       driver.init(config, purgeRecordReader);
@@ -152,7 +171,6 @@ public class SegmentPurger {
 
     @Override
     public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
-
     }
 
     @Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
index c8b922b..81e41ac 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
@@ -20,23 +20,31 @@ package org.apache.pinot.core.minion;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.DimensionFieldSpec;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
-import org.testng.Assert;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
 
 public class SegmentPurgerTest {
   private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SegmentPurgerTest");
@@ -83,6 +91,7 @@ public class SegmentPurgerTest {
     config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
     config.setTableName(TABLE_NAME);
     config.setSegmentName(SEGMENT_NAME);
+    config.setInvertedIndexCreationColumns(Collections.singletonList(D1));
 
     SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
     driver.init(config, genericRowRecordReader);
@@ -94,23 +103,15 @@ public class SegmentPurgerTest {
   public void testPurgeSegment()
       throws Exception {
     // Purge records with d1 = 0
-    SegmentPurger.RecordPurger recordPurger = new SegmentPurger.RecordPurger() {
-      @Override
-      public boolean shouldPurge(GenericRow row) {
-        return row.getValue(D1).equals(0);
-      }
-    };
-
-    // Modify records with d2 = 0 to d2 = 100
-    SegmentPurger.RecordModifier recordModifier = new SegmentPurger.RecordModifier() {
-      @Override
-      public boolean modifyRecord(GenericRow row) {
-        if (row.getValue(D2).equals(0)) {
-          row.putField(D2, Integer.MAX_VALUE);
-          return true;
-        } else {
-          return false;
-        }
+    SegmentPurger.RecordPurger recordPurger = row -> row.getValue(D1).equals(0);
+
+    // Modify records with d2 = 0 to d2 = Integer.MAX_VALUE
+    SegmentPurger.RecordModifier recordModifier = row -> {
+      if (row.getValue(D2).equals(0)) {
+        row.putField(D2, Integer.MAX_VALUE);
+        return true;
+      } else {
+        return false;
       }
     };
 
@@ -119,14 +120,14 @@ public class SegmentPurgerTest {
     File purgedIndexDir = segmentPurger.purgeSegment();
 
     // Check the purge/modify counter in segment purger
-    Assert.assertEquals(segmentPurger.getNumRecordsPurged(), _expectedNumRecordsPurged);
-    Assert.assertEquals(segmentPurger.getNumRecordsModified(), _expectedNumRecordsModified);
+    assertEquals(segmentPurger.getNumRecordsPurged(), _expectedNumRecordsPurged);
+    assertEquals(segmentPurger.getNumRecordsModified(), _expectedNumRecordsModified);
 
     // Check crc and index creation time
     SegmentMetadataImpl purgedSegmentMetadata = new SegmentMetadataImpl(purgedIndexDir);
     SegmentMetadataImpl originalSegmentMetadata = new SegmentMetadataImpl(_originalIndexDir);
-    Assert.assertFalse(purgedSegmentMetadata.getCrc().equals(originalSegmentMetadata.getCrc()));
-    Assert.assertEquals(purgedSegmentMetadata.getIndexCreationTime(), originalSegmentMetadata.getIndexCreationTime());
+    assertNotEquals(purgedSegmentMetadata.getCrc(), originalSegmentMetadata.getCrc());
+    assertEquals(purgedSegmentMetadata.getIndexCreationTime(), originalSegmentMetadata.getIndexCreationTime());
 
     try (PinotSegmentRecordReader pinotSegmentRecordReader = new PinotSegmentRecordReader(purgedIndexDir)) {
       int numRecordsRemaining = 0;
@@ -137,8 +138,8 @@ public class SegmentPurgerTest {
         row = pinotSegmentRecordReader.next(row);
 
         // Purged segment should not have any record with d1 = 0 or d2 = 0
-        Assert.assertFalse(row.getValue(D1).equals(0));
-        Assert.assertFalse(row.getValue(D2).equals(0));
+        assertNotEquals(row.getValue(D1), 0);
+        assertNotEquals(row.getValue(D2), 0);
 
         numRecordsRemaining++;
         if (row.getValue(D2).equals(Integer.MAX_VALUE)) {
@@ -146,8 +147,16 @@ public class SegmentPurgerTest {
         }
       }
 
-      Assert.assertEquals(numRecordsRemaining, NUM_ROWS - _expectedNumRecordsPurged);
-      Assert.assertEquals(numRecordsModified, _expectedNumRecordsModified);
+      assertEquals(numRecordsRemaining, NUM_ROWS - _expectedNumRecordsPurged);
+      assertEquals(numRecordsModified, _expectedNumRecordsModified);
+    }
+
+    // Check inverted index
+    try (SegmentDirectory segmentDirectory = SegmentDirectory
+        .createFromLocalFS(purgedIndexDir, purgedSegmentMetadata, ReadMode.mmap);
+        SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+      assertTrue(reader.hasIndexFor(D1, ColumnIndexType.INVERTED_INDEX));
+      assertFalse(reader.hasIndexFor(D2, ColumnIndexType.INVERTED_INDEX));
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org