You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/10/03 22:14:22 UTC
[pinot] branch master updated: [Upsert] persist validDocsIndex snapshot for Pinot upsert optimization (#9062)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 38ac70a9c1 [Upsert] persist validDocsIndex snapshot for Pinot upsert optimization (#9062)
38ac70a9c1 is described below
commit 38ac70a9c17c647dbbad090ca9fd60faebac5418
Author: deemoliu <qi...@uber.com>
AuthorDate: Mon Oct 3 15:14:16 2022 -0700
[Upsert] persist validDocsIndex snapshot for Pinot upsert optimization (#9062)
---
...adataAndDictionaryAggregationPlanMakerTest.java | 2 +-
.../immutable/ImmutableSegmentImpl.java | 62 +++++++++
.../upsert/BasePartitionUpsertMetadataManager.java | 34 ++++-
.../upsert/BaseTableUpsertMetadataManager.java | 3 +
...oncurrentMapPartitionUpsertMetadataManager.java | 4 +-
.../ConcurrentMapTableUpsertMetadataManager.java | 2 +-
.../pinot/segment/local/upsert/UpsertUtils.java | 45 +++++--
.../ImmutableSegmentImplUpsertSnapshotTest.java | 148 +++++++++++++++++++++
...rrentMapPartitionUpsertMetadataManagerTest.java | 58 ++++++--
.../org/apache/pinot/segment/spi/V1Constants.java | 1 +
.../pinot/spi/config/table/UpsertConfig.java | 11 ++
11 files changed, 347 insertions(+), 23 deletions(-)
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index a887d3751b..f6df4e9b4e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -130,7 +130,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
- "daysSinceEpoch", HashFunction.NONE, null, serverMetrics), new ThreadSafeMutableRoaringBitmap());
+ "daysSinceEpoch", HashFunction.NONE, null, false, serverMetrics), new ThreadSafeMutableRoaringBitmap());
}
@AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index c84454a041..de6555ff43 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -19,12 +19,17 @@
package org.apache.pinot.segment.local.indexsegment.immutable;
import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
@@ -34,6 +39,7 @@ import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -43,7 +49,10 @@ import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,6 +102,59 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
_validDocIds = validDocIds;
}
+ @Nullable
+ public MutableRoaringBitmap loadValidDocIdsFromSnapshot() {
+ File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile();
+ if (validDocIdsSnapshotFile.exists()) {
+ try {
+ byte[] bytes = FileUtils.readFileToByteArray(validDocIdsSnapshotFile);
+ MutableRoaringBitmap validDocIds = new ImmutableRoaringBitmap(ByteBuffer.wrap(bytes)).toMutableRoaringBitmap();
+ LOGGER.info("Loaded valid doc ids for segment: {} with: {} valid docs", getSegmentName(),
+ validDocIds.getCardinality());
+ return validDocIds;
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while loading valid doc ids from snapshot file: {}, ignoring the snapshot",
+ validDocIdsSnapshotFile);
+ }
+ }
+ return null;
+ }
+
+ public void persistValidDocIdsSnapshot(MutableRoaringBitmap validDocIds) {
+ File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile();
+ try {
+ if (validDocIdsSnapshotFile.exists()) {
+ FileUtils.delete(validDocIdsSnapshotFile);
+ }
+ try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(validDocIdsSnapshotFile))) {
+ validDocIds.serialize(dataOutputStream);
+ }
+ LOGGER.info("Persisted valid doc ids for segment: {} with: {} valid docs", getSegmentName(),
+ validDocIds.getCardinality());
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while persisting valid doc ids to snapshot file: {}, skipping",
+ validDocIdsSnapshotFile);
+ }
+ }
+
+ public void deleteValidDocIdsSnapshot() {
+ File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile();
+ if (validDocIdsSnapshotFile.exists()) {
+ try {
+ FileUtils.delete(validDocIdsSnapshotFile);
+ LOGGER.info("Deleted valid doc ids snapshot for segment: {}", getSegmentName());
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while deleting valid doc ids snapshot file: {}, skipping",
+ validDocIdsSnapshotFile);
+ }
+ }
+ }
+
+ private File getValidDocIdsSnapshotFile() {
+ return new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()),
+ V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
+ }
+
@Override
public Dictionary getDictionary(String column) {
ColumnIndexContainer container = _indexContainerMap.get(column);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 41f6fa9f3c..7bcb33cf19 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -54,6 +54,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected final String _comparisonColumn;
protected final HashFunction _hashFunction;
protected final PartialUpsertHandler _partialUpsertHandler;
+ protected final boolean _enableSnapshot;
protected final ServerMetrics _serverMetrics;
protected final Logger _logger;
@@ -65,13 +66,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
- @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) {
+ @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_primaryKeyColumns = primaryKeyColumns;
_comparisonColumn = comparisonColumn;
_hashFunction = hashFunction;
_partialUpsertHandler = partialUpsertHandler;
+ _enableSnapshot = enableSnapshot;
_serverMetrics = serverMetrics;
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
}
@@ -83,9 +85,25 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
@Override
public void addSegment(ImmutableSegment segment) {
- addSegment(segment, null, null);
+ Iterator<RecordInfo> recordInfoIterator = null;
+ if (segment instanceof ImmutableSegmentImpl) {
+ if (_enableSnapshot) {
+ MutableRoaringBitmap validDocIds = ((ImmutableSegmentImpl) segment).loadValidDocIdsFromSnapshot();
+ if (validDocIds != null) {
+ recordInfoIterator =
+ UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn, validDocIds);
+ }
+ } else {
+ ((ImmutableSegmentImpl) segment).deleteValidDocIdsSnapshot();
+ }
+ }
+ addSegment(segment, null, recordInfoIterator);
}
+ /**
+ * NOTE: We allow passing in validDocIds here so that the value can be easily accessed from the tests. The passed in
+ * validDocIds should always be empty.
+ */
@VisibleForTesting
public void addSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable Iterator<RecordInfo> recordInfoIterator) {
@@ -133,6 +151,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
replaceSegment(segment, null, null, oldSegment);
}
+ /**
+ * NOTE: We allow passing in validDocIds here so that the value can be easily accessed from the tests. The passed in
+ * validDocIds should always be empty.
+ */
@VisibleForTesting
public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
@@ -157,6 +179,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
if (validDocIds == null) {
validDocIds = new ThreadSafeMutableRoaringBitmap();
}
+ // New segment doesn't necessary have the same docs as the old segment.
+ // Even for consuming segment, we might re-order the docs.
+ // As a result, we iterate all docIds of the new segment instead of loading it from old segment's snapshot.
if (recordInfoIterator == null) {
recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn);
}
@@ -215,6 +240,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
try {
MutableRoaringBitmap validDocIds =
segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
+
+ if (_enableSnapshot && segment instanceof ImmutableSegmentImpl && validDocIds != null) {
+ ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds);
+ }
+
if (validDocIds == null || validDocIds.isEmpty()) {
_logger.info("Skip removing segment without valid docs: {}", segmentName);
return;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 95666d3ea2..7147341b69 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -38,6 +38,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
protected String _comparisonColumn;
protected HashFunction _hashFunction;
protected PartialUpsertHandler _partialUpsertHandler;
+ protected boolean _enableSnapshot;
protected ServerMetrics _serverMetrics;
@Override
@@ -69,6 +70,8 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
_comparisonColumn);
}
+ _enableSnapshot = upsertConfig.isEnableSnapshot();
+
_serverMetrics = serverMetrics;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 125f61fe47..611153fd13 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -58,9 +58,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
- @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) {
+ @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumn, hashFunction, partialUpsertHandler,
- serverMetrics);
+ enableSnapshot, serverMetrics);
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 9c22316703..3f830bb3a7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -36,7 +36,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
- _comparisonColumn, _hashFunction, _partialUpsertHandler, _serverMetrics));
+ _comparisonColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics));
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index a7ea1b92f7..a1ee7b3266 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -24,6 +24,8 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
@SuppressWarnings("rawtypes")
@@ -32,7 +34,7 @@ public class UpsertUtils {
}
/**
- * Returns an iterator of {@link RecordInfo} from the segment.
+ * Returns an iterator of {@link RecordInfo} for all the documents from the segment.
*/
public static Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment, List<String> primaryKeyColumns,
String comparisonColumn) {
@@ -47,18 +49,45 @@ public class UpsertUtils {
@Override
public RecordInfo next() {
- PrimaryKey primaryKey = new PrimaryKey(new Object[primaryKeyColumns.size()]);
- getPrimaryKey(segment, primaryKeyColumns, _docId, primaryKey);
+ return getRecordInfo(segment, primaryKeyColumns, comparisonColumn, _docId++);
+ }
+ };
+ }
+
+ /**
+ * Returns an iterator of {@link RecordInfo} for the valid documents from the segment.
+ */
+ public static Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment, List<String> primaryKeyColumns,
+ String comparisonColumn, MutableRoaringBitmap validDocIds) {
+ return new Iterator<RecordInfo>() {
+ private final PeekableIntIterator _docIdIterator = validDocIds.getIntIterator();
- Object comparisonValue = segment.getValue(_docId, comparisonColumn);
- if (comparisonValue instanceof byte[]) {
- comparisonValue = new ByteArray((byte[]) comparisonValue);
- }
- return new RecordInfo(primaryKey, _docId++, (Comparable) comparisonValue);
+ @Override
+ public boolean hasNext() {
+ return _docIdIterator.hasNext();
+ }
+
+ @Override
+ public RecordInfo next() {
+ return getRecordInfo(segment, primaryKeyColumns, comparisonColumn, _docIdIterator.next());
}
};
}
+ /**
+ * Reads a {@link RecordInfo} from the segment.
+ */
+ public static RecordInfo getRecordInfo(ImmutableSegment segment, List<String> primaryKeyColumns,
+ String comparisonColumn, int docId) {
+ PrimaryKey primaryKey = new PrimaryKey(new Object[primaryKeyColumns.size()]);
+ getPrimaryKey(segment, primaryKeyColumns, docId, primaryKey);
+ Object comparisonValue = segment.getValue(docId, comparisonColumn);
+ if (comparisonValue instanceof byte[]) {
+ comparisonValue = new ByteArray((byte[]) comparisonValue);
+ }
+ return new RecordInfo(primaryKey, docId, (Comparable) comparisonValue);
+ }
+
/**
* Reads a primary key from the segment.
*/
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
new file mode 100644
index 0000000000..6c31c3981f
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.indexsegment.immutable;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class ImmutableSegmentImplUpsertSnapshotTest {
+ private static final String AVRO_FILE = "data/test_data-mv.avro";
+ private static final String SCHEMA = "data/testDataMVSchema.json";
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ImmutableSegmentImplTest");
+
+ private SegmentDirectory _segmentDirectory;
+ private SegmentMetadataImpl _segmentMetadata;
+ private PinotConfiguration _configuration;
+ private TableConfig _tableConfig;
+ private Schema _schema;
+
+ private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+ private ImmutableSegmentImpl _immutableSegmentImpl;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+
+ Map<String, Object> props = new HashMap<>();
+ props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString());
+ _configuration = new PinotConfiguration(props);
+
+ _segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(INDEX_DIR.toURI(),
+ new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+
+ URL resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(AVRO_FILE);
+ Assert.assertNotNull(resourceUrl);
+ File avroFile = new File(resourceUrl.getFile());
+
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setRowTimeValueCheck(false);
+ ingestionConfig.setSegmentTimeValueCheck(false);
+
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setEnableSnapshot(true);
+
+ _tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
+ .setIngestionConfig(ingestionConfig).setUpsertConfig(upsertConfig).build();
+
+ resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(SCHEMA);
+ _schema = Schema.fromFile(new File(resourceUrl.getFile()));
+
+ SegmentGeneratorConfig config =
+ SegmentTestUtils.getSegmentGeneratorConfigWithSchema(avroFile, INDEX_DIR, "testTable", _tableConfig, _schema);
+
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ driver.init(config);
+ driver.build();
+ _segmentMetadata = Mockito.mock(SegmentMetadataImpl.class);
+ Mockito.when(_segmentMetadata.getColumnMetadataMap()).thenReturn(new HashMap<>());
+ Mockito.when(_segmentMetadata.getIndexDir()).thenReturn(INDEX_DIR);
+ _immutableSegmentImpl = new ImmutableSegmentImpl(_segmentDirectory, _segmentMetadata, new HashMap<>(), null);
+
+ ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
+ _partitionUpsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
+ "daysSinceEpoch", HashFunction.NONE, null, true, serverMetrics);
+
+ _immutableSegmentImpl.enableUpsert(_partitionUpsertMetadataManager, new ThreadSafeMutableRoaringBitmap());
+ }
+
+ @Test
+ public void testPersistValidDocIdsSnapshot() {
+ int[] docIds1 = new int[]{1, 4, 6, 10, 15, 17, 18, 20};
+ MutableRoaringBitmap validDocIds = new MutableRoaringBitmap();
+ validDocIds.add(docIds1);
+
+ _immutableSegmentImpl.persistValidDocIdsSnapshot(validDocIds);
+ assertTrue(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()),
+ V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists());
+
+ MutableRoaringBitmap bitmap = _immutableSegmentImpl.loadValidDocIdsFromSnapshot();
+ assertEquals(bitmap.toArray(), docIds1);
+
+ _immutableSegmentImpl.deleteValidDocIdsSnapshot();
+ assertFalse(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()),
+ V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists());
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ FileUtils.deleteQuietly(INDEX_DIR);
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 5a6e45573c..3df1f45caa 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.upsert;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -37,6 +38,7 @@ import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -55,15 +57,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
@Test
public void testAddReplaceRemoveSegment() {
- verifyAddReplaceRemoveSegment(HashFunction.NONE);
- verifyAddReplaceRemoveSegment(HashFunction.MD5);
- verifyAddReplaceRemoveSegment(HashFunction.MURMUR3);
+ verifyAddReplaceRemoveSegment(HashFunction.NONE, false);
+ verifyAddReplaceRemoveSegment(HashFunction.MD5, false);
+ verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, false);
+ verifyAddReplaceRemoveSegment(HashFunction.NONE, true);
+ verifyAddReplaceRemoveSegment(HashFunction.MD5, true);
+ verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, true);
}
- private void verifyAddReplaceRemoveSegment(HashFunction hashFunction) {
+ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot) {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- "timeCol", hashFunction, null, mock(ServerMetrics.class));
+ "timeCol", hashFunction, null, false, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -73,7 +78,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, primaryKeys1);
- List<RecordInfo> recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps);
+ List<RecordInfo> recordInfoList1;
+ if (enableSnapshot) {
+ // get recordInfo from validDocIdSnapshot.
+ // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ int[] docIds1 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+ recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, timestamps);
+ } else {
+ // get recordInfo by iterating all records.
+ recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps);
+ }
upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator());
// segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
assertEquals(recordLocationMap.size(), 3);
@@ -88,8 +104,20 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
timestamps = new int[]{100, 100, 120, 80, 80};
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment2 = mockImmutableSegment(2, validDocIds2, getPrimaryKeyList(numRecords, primaryKeys));
- upsertMetadataManager.addSegment(segment2, validDocIds2,
- getRecordInfoList(numRecords, primaryKeys, timestamps).iterator());
+ List<RecordInfo> recordInfoList2;
+ if (enableSnapshot) {
+ // get recordInfo from validDocIdSnapshot.
+ // segment2 snapshot: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // segment1 snapshot: 1 -> {4, 120}
+ MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+ validDocIdsSnapshot2.add(new int[]{0, 2, 3});
+ recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, timestamps);
+ } else {
+ // get recordInfo by iterating all records.
+ recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps);
+ }
+ upsertMetadataManager.addSegment(segment2, validDocIds2, recordInfoList2.iterator());
+
// segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
assertEquals(recordLocationMap.size(), 4);
@@ -174,6 +202,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
return recordInfoList;
}
+ /**
+ * Get recordInfo from validDocIdsSnapshot (enabledSnapshot = True).
+ */
+ private List<RecordInfo> getRecordInfoList(MutableRoaringBitmap validDocIdsSnapshot, int[] primaryKeys,
+ int[] timestamps) {
+ List<RecordInfo> recordInfoList = new ArrayList<>();
+ Iterator<Integer> validDocIdsIterator = validDocIdsSnapshot.iterator();
+ validDocIdsIterator.forEachRemaining((docId) -> recordInfoList.add(
+ new RecordInfo(makePrimaryKey(primaryKeys[docId]), docId, new IntWrapper(timestamps[docId]))));
+ return recordInfoList;
+ }
+
private List<PrimaryKey> getPrimaryKeyList(int numRecords, int[] primaryKeys) {
List<PrimaryKey> primaryKeyList = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
@@ -233,7 +273,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddRecord(HashFunction hashFunction) {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- "timeCol", hashFunction, null, mock(ServerMetrics.class));
+ "timeCol", hashFunction, null, false, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index 3a99c99346..c9d4213b63 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -25,6 +25,7 @@ public class V1Constants {
public static final String SEGMENT_CREATION_META = "creation.meta";
public static final String INDEX_MAP_FILE_NAME = "index_map";
public static final String INDEX_FILE_NAME = "columns.psf";
+ public static final String VALID_DOC_IDS_SNAPSHOT_FILE_NAME = "validdocids.bitmap.snapshot";
public static class Str {
public static final char DEFAULT_STRING_PAD_CHAR = '\0';
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 162e6b3030..ae0522f6cd 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -54,6 +54,9 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("Column for upsert comparison, default to time column")
private String _comparisonColumn;
+ @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
+ private boolean _enableSnapshot;
+
@JsonPropertyDescription("Custom class for upsert metadata manager")
private String _metadataManagerClass;
@@ -111,6 +114,10 @@ public class UpsertConfig extends BaseJsonConfig {
return _comparisonColumn;
}
+ public boolean isEnableSnapshot() {
+ return _enableSnapshot;
+ }
+
@Nullable
public String getMetadataManagerClass() {
return _metadataManagerClass;
@@ -153,6 +160,10 @@ public class UpsertConfig extends BaseJsonConfig {
_comparisonColumn = comparisonColumn;
}
+ public void setEnableSnapshot(boolean enableSnapshot) {
+ _enableSnapshot = enableSnapshot;
+ }
+
public void setMetadataManagerClass(String metadataManagerClass) {
_metadataManagerClass = metadataManagerClass;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org