You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/02 00:43:33 UTC

[incubator-pinot] branch purge_inverted_index created (now 313dacd)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch purge_inverted_index
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 313dacd  Generate inverted index in purge task if it exists

This branch includes the following new commits:

     new 313dacd  Generate inverted index in purge task if it exists

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Generate inverted index in purge task if it exists

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch purge_inverted_index
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 313dacd4d565b5d69e40f6df2d9bd3cf396f4ad7
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Wed May 1 17:41:26 2019 -0700

    Generate inverted index in purge task if it exists
    
    If purge task does not generate inverted index, then server needs
    to generate it while loading the segment, which can potentailly
    cause performance issue.
    
    TODO: uniform the behavior of Pinot Hadoop, segment converter and
    purger
---
 .../apache/pinot/core/minion/SegmentPurger.java    | 30 ++++++++---
 .../pinot/core/minion/SegmentPurgerTest.java       | 61 +++++++++++++---------
 2 files changed, 59 insertions(+), 32 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 9581310..3e95b7e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -21,12 +21,14 @@ package org.apache.pinot.core.minion;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.data.StarTreeIndexSpec;
-import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.segment.StarTreeMetadata;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
@@ -34,6 +36,8 @@ import org.apache.pinot.core.data.readers.RecordReader;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +69,7 @@ public class SegmentPurger {
 
   public File purgeSegment()
       throws Exception {
-    SegmentMetadata segmentMetadata = new SegmentMetadataImpl(_originalIndexDir);
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_originalIndexDir);
     String tableName = segmentMetadata.getTableName();
     String segmentName = segmentMetadata.getName();
     LOGGER.info("Start purging table: {}, segment: {}", tableName, segmentName);
@@ -81,7 +85,8 @@ public class SegmentPurger {
         return null;
       }
 
-      SegmentGeneratorConfig config = new SegmentGeneratorConfig(purgeRecordReader.getSchema());
+      Schema schema = purgeRecordReader.getSchema();
+      SegmentGeneratorConfig config = new SegmentGeneratorConfig(schema);
       config.setOutDir(_workingDir.getPath());
       config.setTableName(tableName);
       config.setSegmentName(segmentName);
@@ -100,14 +105,28 @@ public class SegmentPurger {
         config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
       }
 
+      // Generate inverted index if it exists in the original segment
+      // TODO: once the column metadata correctly reflects whether inverted index exists for the column, use that
+      //       instead of reading the segment
+      // TODO: uniform the behavior of Pinot Hadoop segment generation, segment converter and purger
+      List<String> invertedIndexCreationColumns = new ArrayList<>();
+      try (SegmentDirectory segmentDirectory = SegmentDirectory
+          .createFromLocalFS(_originalIndexDir, segmentMetadata, ReadMode.mmap);
+          SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+        for (String column : schema.getColumnNames()) {
+          if (reader.hasIndexFor(column, ColumnIndexType.INVERTED_INDEX)) {
+            invertedIndexCreationColumns.add(column);
+          }
+        }
+      }
+      config.setInvertedIndexCreationColumns(invertedIndexCreationColumns);
+
       // Generate star-tree if it exists in the original segment
       StarTreeMetadata starTreeMetadata = segmentMetadata.getStarTreeMetadata();
       if (starTreeMetadata != null) {
         config.enableStarTreeIndex(StarTreeIndexSpec.fromStarTreeMetadata(starTreeMetadata));
       }
 
-      // TODO: currently we don't generate inverted index
-
       SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
       purgeRecordReader.rewind();
       driver.init(config, purgeRecordReader);
@@ -152,7 +171,6 @@ public class SegmentPurger {
 
     @Override
     public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
-
     }
 
     @Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
index c8b922b..81e41ac 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
@@ -20,23 +20,31 @@ package org.apache.pinot.core.minion;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.DimensionFieldSpec;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
-import org.testng.Assert;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
 
 public class SegmentPurgerTest {
   private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SegmentPurgerTest");
@@ -83,6 +91,7 @@ public class SegmentPurgerTest {
     config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
     config.setTableName(TABLE_NAME);
     config.setSegmentName(SEGMENT_NAME);
+    config.setInvertedIndexCreationColumns(Collections.singletonList(D1));
 
     SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
     driver.init(config, genericRowRecordReader);
@@ -94,23 +103,15 @@ public class SegmentPurgerTest {
   public void testPurgeSegment()
       throws Exception {
     // Purge records with d1 = 0
-    SegmentPurger.RecordPurger recordPurger = new SegmentPurger.RecordPurger() {
-      @Override
-      public boolean shouldPurge(GenericRow row) {
-        return row.getValue(D1).equals(0);
-      }
-    };
-
-    // Modify records with d2 = 0 to d2 = 100
-    SegmentPurger.RecordModifier recordModifier = new SegmentPurger.RecordModifier() {
-      @Override
-      public boolean modifyRecord(GenericRow row) {
-        if (row.getValue(D2).equals(0)) {
-          row.putField(D2, Integer.MAX_VALUE);
-          return true;
-        } else {
-          return false;
-        }
+    SegmentPurger.RecordPurger recordPurger = row -> row.getValue(D1).equals(0);
+
+    // Modify records with d2 = 0 to d2 = Integer.MAX_VALUE
+    SegmentPurger.RecordModifier recordModifier = row -> {
+      if (row.getValue(D2).equals(0)) {
+        row.putField(D2, Integer.MAX_VALUE);
+        return true;
+      } else {
+        return false;
       }
     };
 
@@ -119,14 +120,14 @@ public class SegmentPurgerTest {
     File purgedIndexDir = segmentPurger.purgeSegment();
 
     // Check the purge/modify counter in segment purger
-    Assert.assertEquals(segmentPurger.getNumRecordsPurged(), _expectedNumRecordsPurged);
-    Assert.assertEquals(segmentPurger.getNumRecordsModified(), _expectedNumRecordsModified);
+    assertEquals(segmentPurger.getNumRecordsPurged(), _expectedNumRecordsPurged);
+    assertEquals(segmentPurger.getNumRecordsModified(), _expectedNumRecordsModified);
 
     // Check crc and index creation time
     SegmentMetadataImpl purgedSegmentMetadata = new SegmentMetadataImpl(purgedIndexDir);
     SegmentMetadataImpl originalSegmentMetadata = new SegmentMetadataImpl(_originalIndexDir);
-    Assert.assertFalse(purgedSegmentMetadata.getCrc().equals(originalSegmentMetadata.getCrc()));
-    Assert.assertEquals(purgedSegmentMetadata.getIndexCreationTime(), originalSegmentMetadata.getIndexCreationTime());
+    assertNotEquals(purgedSegmentMetadata.getCrc(), originalSegmentMetadata.getCrc());
+    assertEquals(purgedSegmentMetadata.getIndexCreationTime(), originalSegmentMetadata.getIndexCreationTime());
 
     try (PinotSegmentRecordReader pinotSegmentRecordReader = new PinotSegmentRecordReader(purgedIndexDir)) {
       int numRecordsRemaining = 0;
@@ -137,8 +138,8 @@ public class SegmentPurgerTest {
         row = pinotSegmentRecordReader.next(row);
 
         // Purged segment should not have any record with d1 = 0 or d2 = 0
-        Assert.assertFalse(row.getValue(D1).equals(0));
-        Assert.assertFalse(row.getValue(D2).equals(0));
+        assertNotEquals(row.getValue(D1), 0);
+        assertNotEquals(row.getValue(D2), 0);
 
         numRecordsRemaining++;
         if (row.getValue(D2).equals(Integer.MAX_VALUE)) {
@@ -146,8 +147,16 @@ public class SegmentPurgerTest {
         }
       }
 
-      Assert.assertEquals(numRecordsRemaining, NUM_ROWS - _expectedNumRecordsPurged);
-      Assert.assertEquals(numRecordsModified, _expectedNumRecordsModified);
+      assertEquals(numRecordsRemaining, NUM_ROWS - _expectedNumRecordsPurged);
+      assertEquals(numRecordsModified, _expectedNumRecordsModified);
+    }
+
+    // Check inverted index
+    try (SegmentDirectory segmentDirectory = SegmentDirectory
+        .createFromLocalFS(purgedIndexDir, purgedSegmentMetadata, ReadMode.mmap);
+        SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+      assertTrue(reader.hasIndexFor(D1, ColumnIndexType.INVERTED_INDEX));
+      assertFalse(reader.hasIndexFor(D2, ColumnIndexType.INVERTED_INDEX));
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org