You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/02 20:45:39 UTC
[incubator-pinot] branch master updated: Generate inverted index in
purge task if it exists (#4182)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e1e54cf Generate inverted index in purge task if it exists (#4182)
e1e54cf is described below
commit e1e54cf9e6365641d0e08c87af22b3360938c102
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu May 2 13:45:28 2019 -0700
Generate inverted index in purge task if it exists (#4182)
If purge task does not generate inverted index, then server needs
to generate it while loading the segment, which can potentailly
cause performance issue.
TODO: uniform the behavior of Pinot Hadoop, segment converter and
purger
---
.../apache/pinot/core/minion/SegmentPurger.java | 30 ++++++++---
.../pinot/core/minion/SegmentPurgerTest.java | 61 +++++++++++++---------
2 files changed, 59 insertions(+), 32 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 9581310..3e95b7e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -21,12 +21,14 @@ package org.apache.pinot.core.minion;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.data.StarTreeIndexSpec;
-import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.common.segment.StarTreeMetadata;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
@@ -34,6 +36,8 @@ import org.apache.pinot.core.data.readers.RecordReader;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +69,7 @@ public class SegmentPurger {
public File purgeSegment()
throws Exception {
- SegmentMetadata segmentMetadata = new SegmentMetadataImpl(_originalIndexDir);
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_originalIndexDir);
String tableName = segmentMetadata.getTableName();
String segmentName = segmentMetadata.getName();
LOGGER.info("Start purging table: {}, segment: {}", tableName, segmentName);
@@ -81,7 +85,8 @@ public class SegmentPurger {
return null;
}
- SegmentGeneratorConfig config = new SegmentGeneratorConfig(purgeRecordReader.getSchema());
+ Schema schema = purgeRecordReader.getSchema();
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(schema);
config.setOutDir(_workingDir.getPath());
config.setTableName(tableName);
config.setSegmentName(segmentName);
@@ -100,14 +105,28 @@ public class SegmentPurger {
config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
}
+ // Generate inverted index if it exists in the original segment
+ // TODO: once the column metadata correctly reflects whether inverted index exists for the column, use that
+ // instead of reading the segment
+ // TODO: uniform the behavior of Pinot Hadoop segment generation, segment converter and purger
+ List<String> invertedIndexCreationColumns = new ArrayList<>();
+ try (SegmentDirectory segmentDirectory = SegmentDirectory
+ .createFromLocalFS(_originalIndexDir, segmentMetadata, ReadMode.mmap);
+ SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+ for (String column : schema.getColumnNames()) {
+ if (reader.hasIndexFor(column, ColumnIndexType.INVERTED_INDEX)) {
+ invertedIndexCreationColumns.add(column);
+ }
+ }
+ }
+ config.setInvertedIndexCreationColumns(invertedIndexCreationColumns);
+
// Generate star-tree if it exists in the original segment
StarTreeMetadata starTreeMetadata = segmentMetadata.getStarTreeMetadata();
if (starTreeMetadata != null) {
config.enableStarTreeIndex(StarTreeIndexSpec.fromStarTreeMetadata(starTreeMetadata));
}
- // TODO: currently we don't generate inverted index
-
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
purgeRecordReader.rewind();
driver.init(config, purgeRecordReader);
@@ -152,7 +171,6 @@ public class SegmentPurger {
@Override
public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
-
}
@Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
index c8b922b..81e41ac 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
@@ -20,23 +20,31 @@ package org.apache.pinot.core.minion;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.data.DimensionFieldSpec;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
-import org.testng.Assert;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
public class SegmentPurgerTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SegmentPurgerTest");
@@ -83,6 +91,7 @@ public class SegmentPurgerTest {
config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
config.setTableName(TABLE_NAME);
config.setSegmentName(SEGMENT_NAME);
+ config.setInvertedIndexCreationColumns(Collections.singletonList(D1));
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(config, genericRowRecordReader);
@@ -94,23 +103,15 @@ public class SegmentPurgerTest {
public void testPurgeSegment()
throws Exception {
// Purge records with d1 = 0
- SegmentPurger.RecordPurger recordPurger = new SegmentPurger.RecordPurger() {
- @Override
- public boolean shouldPurge(GenericRow row) {
- return row.getValue(D1).equals(0);
- }
- };
-
- // Modify records with d2 = 0 to d2 = 100
- SegmentPurger.RecordModifier recordModifier = new SegmentPurger.RecordModifier() {
- @Override
- public boolean modifyRecord(GenericRow row) {
- if (row.getValue(D2).equals(0)) {
- row.putField(D2, Integer.MAX_VALUE);
- return true;
- } else {
- return false;
- }
+ SegmentPurger.RecordPurger recordPurger = row -> row.getValue(D1).equals(0);
+
+ // Modify records with d2 = 0 to d2 = Integer.MAX_VALUE
+ SegmentPurger.RecordModifier recordModifier = row -> {
+ if (row.getValue(D2).equals(0)) {
+ row.putField(D2, Integer.MAX_VALUE);
+ return true;
+ } else {
+ return false;
}
};
@@ -119,14 +120,14 @@ public class SegmentPurgerTest {
File purgedIndexDir = segmentPurger.purgeSegment();
// Check the purge/modify counter in segment purger
- Assert.assertEquals(segmentPurger.getNumRecordsPurged(), _expectedNumRecordsPurged);
- Assert.assertEquals(segmentPurger.getNumRecordsModified(), _expectedNumRecordsModified);
+ assertEquals(segmentPurger.getNumRecordsPurged(), _expectedNumRecordsPurged);
+ assertEquals(segmentPurger.getNumRecordsModified(), _expectedNumRecordsModified);
// Check crc and index creation time
SegmentMetadataImpl purgedSegmentMetadata = new SegmentMetadataImpl(purgedIndexDir);
SegmentMetadataImpl originalSegmentMetadata = new SegmentMetadataImpl(_originalIndexDir);
- Assert.assertFalse(purgedSegmentMetadata.getCrc().equals(originalSegmentMetadata.getCrc()));
- Assert.assertEquals(purgedSegmentMetadata.getIndexCreationTime(), originalSegmentMetadata.getIndexCreationTime());
+ assertNotEquals(purgedSegmentMetadata.getCrc(), originalSegmentMetadata.getCrc());
+ assertEquals(purgedSegmentMetadata.getIndexCreationTime(), originalSegmentMetadata.getIndexCreationTime());
try (PinotSegmentRecordReader pinotSegmentRecordReader = new PinotSegmentRecordReader(purgedIndexDir)) {
int numRecordsRemaining = 0;
@@ -137,8 +138,8 @@ public class SegmentPurgerTest {
row = pinotSegmentRecordReader.next(row);
// Purged segment should not have any record with d1 = 0 or d2 = 0
- Assert.assertFalse(row.getValue(D1).equals(0));
- Assert.assertFalse(row.getValue(D2).equals(0));
+ assertNotEquals(row.getValue(D1), 0);
+ assertNotEquals(row.getValue(D2), 0);
numRecordsRemaining++;
if (row.getValue(D2).equals(Integer.MAX_VALUE)) {
@@ -146,8 +147,16 @@ public class SegmentPurgerTest {
}
}
- Assert.assertEquals(numRecordsRemaining, NUM_ROWS - _expectedNumRecordsPurged);
- Assert.assertEquals(numRecordsModified, _expectedNumRecordsModified);
+ assertEquals(numRecordsRemaining, NUM_ROWS - _expectedNumRecordsPurged);
+ assertEquals(numRecordsModified, _expectedNumRecordsModified);
+ }
+
+ // Check inverted index
+ try (SegmentDirectory segmentDirectory = SegmentDirectory
+ .createFromLocalFS(purgedIndexDir, purgedSegmentMetadata, ReadMode.mmap);
+ SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
+ assertTrue(reader.hasIndexFor(D1, ColumnIndexType.INVERTED_INDEX));
+ assertFalse(reader.hasIndexFor(D2, ColumnIndexType.INVERTED_INDEX));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org