You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/22 18:59:18 UTC
[pinot] branch master updated: Take upsert snapshot when creating new consuming segment (#10928)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3c92b32711 Take upsert snapshot when creating new consuming segment (#10928)
3c92b32711 is described below
commit 3c92b327114c4534f354e8a8f2755197a6439fd5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Jun 22 11:59:10 2023 -0700
Take upsert snapshot when creating new consuming segment (#10928)
---
.../realtime/LLRealtimeSegmentDataManager.java | 31 ++--
.../immutable/ImmutableSegmentImpl.java | 9 +-
.../upsert/BasePartitionUpsertMetadataManager.java | 158 +++++++++++++++------
.../upsert/PartitionUpsertMetadataManager.java | 6 +
.../ImmutableSegmentImplUpsertSnapshotTest.java | 149 -------------------
...rrentMapPartitionUpsertMetadataManagerTest.java | 17 ++-
6 files changed, 161 insertions(+), 209 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0e1a70bfc2..99f3c6d802 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -232,6 +232,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// consuming.
private final AtomicBoolean _acquiredConsumerSemaphore;
private final ServerMetrics _serverMetrics;
+ private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final BooleanSupplier _isReadyToConsumeData;
private final MutableSegmentImpl _realtimeSegment;
private volatile StreamPartitionMsgOffset _currentOffset;
@@ -400,13 +401,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// anymore. Remove the file if it exists.
removeSegmentFile();
- if (!_isReadyToConsumeData.getAsBoolean()) {
- do {
- //noinspection BusyWait
- Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
- } while (!_shouldStop && !endCriteriaReached() && !_isReadyToConsumeData.getAsBoolean());
- }
-
_numRowsErrored = 0;
final long idlePipeSleepTimeMillis = 100;
final long idleTimeoutMillis = _partitionLevelStreamConfig.getIdleTimeoutMillis();
@@ -662,6 +656,27 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
long catchUpTimeMillis = 0L;
_startTimeMs = now();
try {
+ if (!_isReadyToConsumeData.getAsBoolean()) {
+ do {
+ //noinspection BusyWait
+ Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+ } while (!_shouldStop && !_isReadyToConsumeData.getAsBoolean());
+ }
+
+ // TODO:
+ // When reaching here, the current consuming segment has already acquired the consumer semaphore, but there is
+ // no guarantee that the previous consuming segment is already persisted (replaced with immutable segment). It
+ // can potentially cause the following problems:
+ // 1. The snapshot for the previous consuming segment might not be taken since it is not persisted yet
+ // 2. If the previous consuming segment is dropped but immutable segment is not downloaded and replaced yet,
+ // it might cause inconsistency (especially for partial upsert because events are not consumed in sequence)
+ // To address this problem, we should consider releasing the consumer semaphore after the consuming segment is
+ // persisted.
+ // Take upsert snapshot before starting consuming events
+ if (_partitionUpsertMetadataManager != null) {
+ _partitionUpsertMetadataManager.takeSnapshot();
+ }
+
while (!_state.isFinal()) {
if (_state.shouldConsume()) {
consumeLoop(); // Consume until we reached the end criteria, or we are stopped.
@@ -863,7 +878,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
if (_partitionMetadataProvider == null) {
createPartitionMetadataProvider("Get Partition Lag State");
}
- ;
return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
}
@@ -1309,6 +1323,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_indexLoadingConfig = indexLoadingConfig;
_schema = schema;
_serverMetrics = serverMetrics;
+ _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_isReadyToConsumeData = isReadyToConsumeData;
_segmentVersion = indexLoadingConfig.getSegmentVersion();
_instanceId = _realtimeTableDataManager.getServerInstance();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 1e00f85077..66875eb726 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -123,7 +123,7 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
return null;
}
- public void persistValidDocIdsSnapshot(MutableRoaringBitmap validDocIds) {
+ public void persistValidDocIdsSnapshot() {
File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile();
try {
if (validDocIdsSnapshotFile.exists()) {
@@ -132,14 +132,15 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
return;
}
}
+ MutableRoaringBitmap validDocIdsSnapshot = _validDocIds.getMutableRoaringBitmap();
try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(validDocIdsSnapshotFile))) {
- validDocIds.serialize(dataOutputStream);
+ validDocIdsSnapshot.serialize(dataOutputStream);
}
LOGGER.info("Persisted valid doc ids for segment: {} with: {} valid docs", getSegmentName(),
- validDocIds.getCardinality());
+ validDocIdsSnapshot.getCardinality());
} catch (Exception e) {
LOGGER.warn("Caught exception while persisting valid doc ids to snapshot file: {}, skipping",
- validDocIdsSnapshotFile);
+ validDocIdsSnapshotFile, e);
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 6556ffce8a..1501cd28e6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
@@ -61,8 +63,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected final ServerMetrics _serverMetrics;
protected final Logger _logger;
- @VisibleForTesting
- public final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
+ // Tracks all the segments managed by this manager (excluding EmptySegment)
+ protected final Set<IndexSegment> _trackedSegments = ConcurrentHashMap.newKeySet();
+
+ // NOTE: We do not persist snapshot on the first consuming segment because most segments might not be loaded yet
+ protected volatile boolean _gotFirstConsumingSegment = false;
+ protected final ReadWriteLock _snapshotLock;
protected volatile boolean _stopped = false;
// Initialize with 1 pending operation to indicate the metadata manager can take more operations
@@ -81,6 +87,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_hashFunction = hashFunction;
_partialUpsertHandler = partialUpsertHandler;
_enableSnapshot = enableSnapshot;
+ _snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
_serverMetrics = serverMetrics;
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
}
@@ -92,44 +99,51 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
@Override
public void addSegment(ImmutableSegment segment) {
+ String segmentName = segment.getSegmentName();
if (_stopped) {
_logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName());
return;
}
+ if (segment instanceof EmptyIndexSegment) {
+ _logger.info("Skip adding empty segment: {}", segmentName);
+ return;
+ }
+ Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+ "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+ _tableNameWithType);
+
+ if (_enableSnapshot) {
+ _snapshotLock.readLock().lock();
+ }
startOperation();
try {
- doAddSegment(segment);
+ doAddSegment((ImmutableSegmentImpl) segment);
+ _trackedSegments.add(segment);
} finally {
finishOperation();
+ if (_enableSnapshot) {
+ _snapshotLock.readLock().unlock();
+ }
}
}
- protected void doAddSegment(ImmutableSegment segment) {
+ protected void doAddSegment(ImmutableSegmentImpl segment) {
String segmentName = segment.getSegmentName();
_logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys());
+ long startTimeMs = System.currentTimeMillis();
- if (segment instanceof EmptyIndexSegment) {
- _logger.info("Skip adding empty segment: {}", segmentName);
- return;
- }
-
- Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
- "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
- _tableNameWithType);
-
- ImmutableSegmentImpl immutableSegmentImpl = (ImmutableSegmentImpl) segment;
MutableRoaringBitmap validDocIds;
if (_enableSnapshot) {
- validDocIds = immutableSegmentImpl.loadValidDocIdsFromSnapshot();
+ validDocIds = segment.loadValidDocIdsFromSnapshot();
if (validDocIds != null && validDocIds.isEmpty()) {
_logger.info("Skip adding segment: {} without valid doc, current primary key count: {}",
segment.getSegmentName(), getNumPrimaryKeys());
- immutableSegmentImpl.enableUpsert(this, new ThreadSafeMutableRoaringBitmap());
+ segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap());
return;
}
} else {
validDocIds = null;
- immutableSegmentImpl.deleteValidDocIdsSnapshot();
+ segment.deleteValidDocIdsSnapshot();
}
try (UpsertUtils.RecordInfoReader recordInfoReader = UpsertUtils.makeRecordReader(segment, _primaryKeyColumns,
@@ -141,7 +155,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
recordInfoIterator =
UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
}
- addSegment(immutableSegmentImpl, null, recordInfoIterator);
+ addSegment(segment, null, recordInfoIterator);
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while adding segment: %s, table: %s", segmentName, _tableNameWithType), e);
@@ -152,7 +166,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
numPrimaryKeys);
- _logger.info("Finished adding segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+ _logger.info("Finished adding segment: {} in {}ms, current primary key count: {}", segmentName,
+ System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
/**
@@ -188,9 +203,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
segment.getSegmentName());
return;
}
+
+ // NOTE: We don't acquire snapshot read lock here because snapshot is always taken before a new consuming segment
+ // starts consuming, so it won't overlap with this method
+ _gotFirstConsumingSegment = true;
startOperation();
try {
doAddRecord(segment, recordInfo);
+ _trackedSegments.add(segment);
} finally {
finishOperation();
}
@@ -204,11 +224,22 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_logger.info("Skip replacing segment: {} because metadata manager is already stopped", segment.getSegmentName());
return;
}
+
+ if (_enableSnapshot) {
+ _snapshotLock.readLock().lock();
+ }
startOperation();
try {
doReplaceSegment(segment, oldSegment);
+ if (!(segment instanceof EmptyIndexSegment)) {
+ _trackedSegments.add(segment);
+ }
+ _trackedSegments.remove(oldSegment);
} finally {
finishOperation();
+ if (_enableSnapshot) {
+ _snapshotLock.readLock().lock();
+ }
}
}
@@ -219,6 +250,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_tableNameWithType, oldSegment.getSegmentName(), segmentName);
_logger.info("Replacing {} segment: {}, current primary key count: {}",
oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
+ long startTimeMs = System.currentTimeMillis();
if (segment instanceof EmptyIndexSegment) {
_logger.info("Skip adding empty segment: {}", segmentName);
@@ -241,7 +273,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
numPrimaryKeys);
- _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+ _logger.info("Finished replacing segment: {} in {}ms, current primary key count: {}", segmentName,
+ System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
/**
@@ -288,10 +321,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
} finally {
segmentLock.unlock();
}
-
- if (!(oldSegment instanceof EmptyIndexSegment)) {
- _replacedSegments.add(oldSegment);
- }
}
protected abstract void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds);
@@ -299,29 +328,27 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
@Override
public void removeSegment(IndexSegment segment) {
String segmentName = segment.getSegmentName();
- if (_replacedSegments.remove(segment)) {
- _logger.info("Skip removing replaced segment: {}", segmentName);
+ if (_stopped) {
+ _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName);
return;
}
- // Allow persisting valid doc ids snapshot after metadata manager is stopped
- MutableRoaringBitmap validDocIds =
- segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
- if (_enableSnapshot && segment instanceof ImmutableSegmentImpl) {
- ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds);
- }
- if (validDocIds == null || validDocIds.isEmpty()) {
- _logger.info("Skip removing segment without valid docs: {}", segmentName);
+ if (!_trackedSegments.contains(segment)) {
+ _logger.info("Skip removing untracked (replaced or empty) segment: {}", segmentName);
return;
}
- if (_stopped) {
- _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName);
- return;
+
+ if (_enableSnapshot) {
+ _snapshotLock.readLock().lock();
}
startOperation();
try {
doRemoveSegment(segment);
+ _trackedSegments.remove(segment);
} finally {
finishOperation();
+ if (_enableSnapshot) {
+ _snapshotLock.readLock().unlock();
+ }
}
}
@@ -329,13 +356,18 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
String segmentName = segment.getSegmentName();
_logger.info("Removing {} segment: {}, current primary key count: {}",
segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
+ long startTimeMs = System.currentTimeMillis();
+
+ MutableRoaringBitmap validDocIds =
+ segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
+ if (validDocIds == null || validDocIds.isEmpty()) {
+ _logger.info("Skip removing segment without valid docs: {}", segmentName);
+ return;
+ }
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
segmentLock.lock();
try {
- assert segment.getValidDocIds() != null;
- MutableRoaringBitmap validDocIds = segment.getValidDocIds().getMutableRoaringBitmap();
- assert validDocIds != null;
_logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
removeSegment(segment, validDocIds);
} finally {
@@ -347,7 +379,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
numPrimaryKeys);
- _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+ _logger.info("Finished removing segment: {} in {}ms, current primary key count: {}", segmentName,
+ System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}
@Override
@@ -360,6 +393,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_logger.debug("Skip updating record because metadata manager is already stopped");
return record;
}
+
startOperation();
try {
return doUpdateRecord(record, recordInfo);
@@ -385,6 +419,48 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
}
}
+ @Override
+ public void takeSnapshot() {
+ if (!_enableSnapshot) {
+ return;
+ }
+ if (_stopped) {
+ _logger.info("Skip taking snapshot because metadata manager is already stopped");
+ return;
+ }
+ if (!_gotFirstConsumingSegment) {
+ _logger.info("Skip taking snapshot before getting the first consuming segment");
+ return;
+ }
+
+ _snapshotLock.writeLock().lock();
+ startOperation();
+ try {
+ doTakeSnapshot();
+ } finally {
+ finishOperation();
+ _snapshotLock.writeLock().unlock();
+ }
+ }
+
+ // TODO: Consider optimizing it by tracking and persisting only the changed snapshot
+ protected void doTakeSnapshot() {
+ int numTrackedSegments = _trackedSegments.size();
+ _logger.info("Taking snapshot for {} segments", numTrackedSegments);
+ long startTimeMs = System.currentTimeMillis();
+
+ int numImmutableSegments = 0;
+ for (IndexSegment segment : _trackedSegments) {
+ if (segment instanceof ImmutableSegmentImpl) {
+ ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot();
+ numImmutableSegments++;
+ }
+ }
+
+ _logger.info("Finished taking snapshot for {} immutable segments (out of {} total segments) in {}ms",
+ numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
+ }
+
protected void startOperation() {
_numPendingOperations.getAndIncrement();
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index dd07143fd3..55cd8497cb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -84,6 +84,12 @@ public interface PartitionUpsertMetadataManager extends Closeable {
*/
GenericRow updateRecord(GenericRow record, RecordInfo recordInfo);
+ /**
+ * Takes snapshot for all the tracked immutable segments when snapshot is enabled. This method should be invoked
+ * before a new consuming segment starts consuming.
+ */
+ void takeSnapshot();
+
/**
* Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
*/
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
deleted file mode 100644
index 4bb049cc2c..0000000000
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.indexsegment.immutable;
-
-import java.io.File;
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager;
-import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
-import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
-import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
-import org.apache.pinot.spi.config.table.HashFunction;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.UpsertConfig;
-import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.mockito.Mockito;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
-
-public class ImmutableSegmentImplUpsertSnapshotTest {
- private static final String AVRO_FILE = "data/test_data-mv.avro";
- private static final String SCHEMA = "data/testDataMVSchema.json";
- private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ImmutableSegmentImplTest");
-
- private SegmentDirectory _segmentDirectory;
- private SegmentMetadataImpl _segmentMetadata;
- private PinotConfiguration _configuration;
- private TableConfig _tableConfig;
- private Schema _schema;
-
- private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
- private ImmutableSegmentImpl _immutableSegmentImpl;
-
- @BeforeClass
- public void setUp()
- throws Exception {
- FileUtils.deleteQuietly(INDEX_DIR);
-
- Map<String, Object> props = new HashMap<>();
- props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString());
- _configuration = new PinotConfiguration(props);
-
- _segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(INDEX_DIR.toURI(),
- new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
-
- URL resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(AVRO_FILE);
- Assert.assertNotNull(resourceUrl);
- File avroFile = new File(resourceUrl.getFile());
-
- IngestionConfig ingestionConfig = new IngestionConfig();
- ingestionConfig.setRowTimeValueCheck(false);
- ingestionConfig.setSegmentTimeValueCheck(false);
-
- UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
- upsertConfig.setEnableSnapshot(true);
-
- _tableConfig =
- new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
- .setIngestionConfig(ingestionConfig).setUpsertConfig(upsertConfig).build();
-
- resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(SCHEMA);
- _schema = Schema.fromFile(new File(resourceUrl.getFile()));
-
- SegmentGeneratorConfig config =
- SegmentTestUtils.getSegmentGeneratorConfigWithSchema(avroFile, INDEX_DIR, "testTable", _tableConfig, _schema);
-
- SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
- driver.init(config);
- driver.build();
- _segmentMetadata = Mockito.mock(SegmentMetadataImpl.class);
- Mockito.when(_segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap<>());
- Mockito.when(_segmentMetadata.getIndexDir()).thenReturn(INDEX_DIR);
- _immutableSegmentImpl = new ImmutableSegmentImpl(_segmentDirectory, _segmentMetadata, new HashMap<>(), null);
-
- ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
- _partitionUpsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
- Collections.singletonList("daysSinceEpoch"), HashFunction.NONE, null, true, serverMetrics);
-
- _immutableSegmentImpl.enableUpsert(_partitionUpsertMetadataManager, new ThreadSafeMutableRoaringBitmap());
- }
-
- @Test
- public void testPersistValidDocIdsSnapshot() {
- int[] docIds1 = new int[]{1, 4, 6, 10, 15, 17, 18, 20};
- MutableRoaringBitmap validDocIds = new MutableRoaringBitmap();
- validDocIds.add(docIds1);
-
- _immutableSegmentImpl.persistValidDocIdsSnapshot(validDocIds);
- assertTrue(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()),
- V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists());
-
- MutableRoaringBitmap bitmap = _immutableSegmentImpl.loadValidDocIdsFromSnapshot();
- assertEquals(bitmap.toArray(), docIds1);
-
- _immutableSegmentImpl.deleteValidDocIdsSnapshot();
- assertFalse(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()),
- V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists());
- }
-
- @AfterMethod
- public void tearDown() {
- FileUtils.deleteQuietly(INDEX_DIR);
- }
-}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 1e3f130059..a89f7e9d64 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
@@ -53,7 +54,6 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;
-import static org.testng.Assert.assertTrue;
public class ConcurrentMapPartitionUpsertMetadataManagerTest {
@@ -78,6 +78,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList(comparisonColumn), hashFunction, null, false, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+ Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
// Add the first segment
int numRecords = 6;
@@ -99,6 +100,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps);
}
upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator());
+ trackedSegments.add(segment1);
// segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
assertEquals(recordLocationMap.size(), 3);
checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
@@ -125,6 +127,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps);
}
upsertMetadataManager.addSegment(segment2, validDocIds2, recordInfoList2.iterator());
+ trackedSegments.add(segment2);
// segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
@@ -153,6 +156,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ThreadSafeMutableRoaringBitmap newValidDocIds1 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, primaryKeys1);
upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, recordInfoList1.iterator(), segment1);
+ trackedSegments.add(newSegment1);
+ trackedSegments.remove(segment1);
// original segment1: 1 -> {4, 120} (not in the map)
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
@@ -164,7 +169,6 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
- assertEquals(upsertMetadataManager._replacedSegments, Collections.singleton(segment1));
// Remove the original segment1
upsertMetadataManager.removeSegment(segment1);
@@ -178,7 +182,6 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
- assertTrue(upsertMetadataManager._replacedSegments.isEmpty());
// Remove the empty segment
upsertMetadataManager.removeSegment(emptySegment);
@@ -200,6 +203,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ assertEquals(trackedSegments, Collections.singleton(newSegment1));
// Stop the metadata manager
upsertMetadataManager.stop();
@@ -210,6 +214,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(recordLocationMap.size(), 1);
checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ assertEquals(trackedSegments, Collections.singleton(newSegment1));
// Close the metadata manager
upsertMetadataManager.close();
@@ -218,8 +223,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) {
List<RecordInfo> recordInfoList = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
- recordInfoList.add(
- new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i])));
+ recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i])));
}
return recordInfoList;
}
@@ -288,8 +292,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertNotNull(recordLocation);
assertSame(recordLocation.getSegment(), segment);
assertEquals(recordLocation.getDocId(), docId);
- assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value,
- comparisonValue);
+ assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, comparisonValue);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org