You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/02/09 18:29:46 UTC
[pinot] branch master updated: Avoid upsert metadata access after the manager is closed (#10251)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2c57462d3b Avoid upsert metadata access after the manager is closed (#10251)
2c57462d3b is described below
commit 2c57462d3bd3e5b2e15283e45f263adb773f1dc8
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Feb 9 10:29:38 2023 -0800
Avoid upsert metadata access after the manager is closed (#10251)
---
.../core/data/manager/BaseTableDataManager.java | 8 +
.../realtime/LLRealtimeSegmentDataManager.java | 5 +
.../manager/realtime/RealtimeTableDataManager.java | 12 +-
.../local/data/manager/TableDataManager.java | 2 +
.../upsert/BasePartitionUpsertMetadataManager.java | 161 ++++++++++++++++++---
...oncurrentMapPartitionUpsertMetadataManager.java | 10 +-
.../ConcurrentMapTableUpsertMetadataManager.java | 12 +-
.../upsert/PartitionUpsertMetadataManager.java | 5 +
.../local/upsert/TableUpsertMetadataManager.java | 5 +
...rrentMapPartitionUpsertMetadataManagerTest.java | 43 +++++-
10 files changed, 223 insertions(+), 40 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 6ce2c3e451..7a3a32f903 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -107,6 +107,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
// Cache used for identifying segments which could not be acquired since they were recently deleted.
protected Cache<String, String> _recentlyDeletedSegments;
+ protected volatile boolean _shutDown;
+
@Override
public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
@@ -182,12 +184,18 @@ public abstract class BaseTableDataManager implements TableDataManager {
@Override
public void shutDown() {
_logger.info("Shutting down table data manager for table: {}", _tableNameWithType);
+ _shutDown = true;
doShutdown();
_logger.info("Shut down table data manager for table: {}", _tableNameWithType);
}
protected abstract void doShutdown();
+ @Override
+ public boolean isShutDown() {
+ return _shutDown;
+ }
+
/**
* {@inheritDoc}
* <p>If one segment already exists with the same name, replaces it with the new one.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 6e43a77202..4d82f6953e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -873,6 +873,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
closeStreamConsumers();
+ // Do not allow building segment when table data manager is already shut down
+ if (_realtimeTableDataManager.isShutDown()) {
+ _segmentLogger.warn("Table data manager is already shut down");
+ return null;
+ }
try {
final long startTimeMillis = now();
if (_segBuildSemaphore != null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 9315770943..96adea8ee6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -246,14 +246,20 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
@Override
protected void doShutdown() {
if (_tableUpsertMetadataManager != null) {
+ // Stop the upsert metadata manager first to prevent removing metadata when destroying segments
+ _tableUpsertMetadataManager.stop();
+ for (SegmentDataManager segmentDataManager : _segmentDataManagerMap.values()) {
+ segmentDataManager.destroy();
+ }
try {
_tableUpsertMetadataManager.close();
} catch (IOException e) {
_logger.warn("Cannot close upsert metadata manager properly for table: {}", _tableNameWithType, e);
}
- }
- for (SegmentDataManager segmentDataManager : _segmentDataManagerMap.values()) {
- segmentDataManager.destroy();
+ } else {
+ for (SegmentDataManager segmentDataManager : _segmentDataManagerMap.values()) {
+ segmentDataManager.destroy();
+ }
}
if (_leaseExtender != null) {
_leaseExtender.shutDown();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 121e47f6f6..ae53ce3383 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -62,6 +62,8 @@ public interface TableDataManager {
*/
void shutDown();
+ boolean isShutDown();
+
/**
* Adds a loaded immutable segment into the table.
*/
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 6bc9e94067..361052fdc2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -37,8 +38,10 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +64,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
@VisibleForTesting
public final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
- protected volatile boolean _closed = false;
+ protected volatile boolean _stopped = false;
+ // Initialize with 1 pending operation to indicate the metadata manager can take more operations
+ protected final AtomicInteger _numPendingOperations = new AtomicInteger(1);
protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
protected int _numOutOfOrderEvents = 0;
@@ -87,6 +92,19 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
@Override
public void addSegment(ImmutableSegment segment) {
+ if (_stopped) {
+ _logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName());
+ return;
+ }
+ startOperation();
+ try {
+ doAddSegment(segment);
+ } finally {
+ finishOperation();
+ }
+ }
+
+ protected void doAddSegment(ImmutableSegment segment) {
String segmentName = segment.getSegmentName();
_logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys());
@@ -162,8 +180,38 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
@Nullable MutableRoaringBitmap validDocIdsForOldSegment);
+ @Override
+ public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
+ if (_stopped) {
+ _logger.debug("Skip adding record to segment: {} because metadata manager is already stopped",
+ segment.getSegmentName());
+ return;
+ }
+ startOperation();
+ try {
+ doAddRecord(segment, recordInfo);
+ } finally {
+ finishOperation();
+ }
+ }
+
+ protected abstract void doAddRecord(MutableSegment segment, RecordInfo recordInfo);
+
@Override
public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
+ if (_stopped) {
+ _logger.info("Skip replacing segment: {} because metadata manager is already stopped", segment.getSegmentName());
+ return;
+ }
+ startOperation();
+ try {
+ doReplaceSegment(segment, oldSegment);
+ } finally {
+ finishOperation();
+ }
+ }
+
+ protected void doReplaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
String segmentName = segment.getSegmentName();
Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()),
"Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}",
@@ -250,34 +298,43 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
@Override
public void removeSegment(IndexSegment segment) {
String segmentName = segment.getSegmentName();
- _logger.info("Removing {} segment: {}, current primary key count: {}",
- segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
-
if (_replacedSegments.remove(segment)) {
_logger.info("Skip removing replaced segment: {}", segmentName);
return;
}
+ // Allow persisting valid doc ids snapshot after metadata manager is stopped
+ MutableRoaringBitmap validDocIds =
+ segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
+ if (_enableSnapshot && segment instanceof ImmutableSegmentImpl) {
+ ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds);
+ }
+ if (validDocIds == null || validDocIds.isEmpty()) {
+ _logger.info("Skip removing segment without valid docs: {}", segmentName);
+ return;
+ }
+ if (_stopped) {
+ _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName);
+ return;
+ }
+ startOperation();
+ try {
+ doRemoveSegment(segment);
+ } finally {
+ finishOperation();
+ }
+ }
+
+ protected void doRemoveSegment(IndexSegment segment) {
+ String segmentName = segment.getSegmentName();
+ _logger.info("Removing {} segment: {}, current primary key count: {}",
+ segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
segmentLock.lock();
try {
- MutableRoaringBitmap validDocIds =
- segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
-
- if (_enableSnapshot && segment instanceof ImmutableSegmentImpl && validDocIds != null) {
- ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds);
- }
-
- if (_closed) {
- _logger.info("Skip removing segment: {} because metadata manager is already closed", segment);
- return;
- }
-
- if (validDocIds == null || validDocIds.isEmpty()) {
- _logger.info("Skip removing segment without valid docs: {}", segmentName);
- return;
- }
-
+ assert segment.getValidDocIds() != null;
+ MutableRoaringBitmap validDocIds = segment.getValidDocIds().getMutableRoaringBitmap();
+ assert validDocIds != null;
_logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
removeSegment(segment, validDocIds);
} finally {
@@ -292,6 +349,26 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
}
+ @Override
+ public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
+ // Directly return the record when partial-upsert is not enabled
+ if (_partialUpsertHandler == null) {
+ return record;
+ }
+ if (_stopped) {
+ _logger.debug("Skip updating record because metadata manager is already stopped");
+ return record;
+ }
+ startOperation();
+ try {
+ return doUpdateRecord(record, recordInfo);
+ } finally {
+ finishOperation();
+ }
+ }
+
+ protected abstract GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo);
+
protected void handleOutOfOrderEvent(Object currentComparisonValue, Object recordComparisonValue) {
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
_numOutOfOrderEvents++;
@@ -305,10 +382,48 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
}
}
+ protected void startOperation() {
+ _numPendingOperations.getAndIncrement();
+ }
+
+ protected void finishOperation() {
+ if (_numPendingOperations.decrementAndGet() == 0) {
+ synchronized (_numPendingOperations) {
+ _numPendingOperations.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ _stopped = true;
+ int numPendingOperations = _numPendingOperations.decrementAndGet();
+ _logger.info("Stopped the metadata manager with {} pending operations, current primary key count: {}",
+ numPendingOperations, getNumPrimaryKeys());
+ }
+
@Override
public void close()
throws IOException {
- _logger.info("Closing the metadata manager, current primary key count: {}", getNumPrimaryKeys());
- _closed = true;
+ Preconditions.checkState(_stopped, "Must stop the metadata manager before closing it");
+ _logger.info("Closing the metadata manager");
+ synchronized (_numPendingOperations) {
+ int numPendingOperations;
+ while ((numPendingOperations = _numPendingOperations.get()) != 0) {
+ _logger.info("Waiting for {} pending operations to finish", numPendingOperations);
+ try {
+ _numPendingOperations.wait();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ String.format("Interrupted while waiting for %d pending operations to finish", numPendingOperations), e);
+ }
+ }
+ }
+ doClose();
+ _logger.info("Closed the metadata manager");
+ }
+
+ protected void doClose()
+ throws IOException {
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index e11c1899f3..b1687afdf8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -182,7 +182,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
}
@Override
- public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
+ protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
(primaryKey, currentRecordLocation) -> {
@@ -217,12 +217,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
}
@Override
- public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
- // Directly return the record when partial-upsert is not enabled
- if (_partialUpsertHandler == null) {
- return record;
- }
-
+ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
+ assert _partialUpsertHandler != null;
AtomicReference<GenericRow> previousRecordReference = new AtomicReference<>();
RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.computeIfPresent(
HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (pk, recordLocation) -> {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 3f830bb3a7..67d6fe773b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -39,12 +39,18 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
_comparisonColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics));
}
+ @Override
+ public void stop() {
+ for (ConcurrentMapPartitionUpsertMetadataManager metadataManager : _partitionMetadataManagerMap.values()) {
+ metadataManager.stop();
+ }
+ }
+
@Override
public void close()
throws IOException {
- for (ConcurrentMapPartitionUpsertMetadataManager partitionUpsertMetadataManager
- : _partitionMetadataManagerMap.values()) {
- partitionUpsertMetadataManager.close();
+ for (ConcurrentMapPartitionUpsertMetadataManager metadataManager : _partitionMetadataManagerMap.values()) {
+ metadataManager.close();
}
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index ef5ec7c414..dd07143fd3 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -83,4 +83,9 @@ public interface PartitionUpsertMetadataManager extends Closeable {
* Returns the merged record when partial-upsert is enabled.
*/
GenericRow updateRecord(GenericRow record, RecordInfo recordInfo);
+
+ /**
+ * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
+ */
+ void stop();
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index de46879712..52007f16fe 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -38,4 +38,9 @@ public interface TableUpsertMetadataManager extends Closeable {
PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId);
UpsertConfig.Mode getUpsertMode();
+
+ /**
+ * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
+ */
+ void stop();
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index a0f18aaf14..6b4bf34368 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.upsert;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -60,7 +61,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
@Test
- public void testAddReplaceRemoveSegment() {
+ public void testAddReplaceRemoveSegment()
+ throws IOException {
verifyAddReplaceRemoveSegment(HashFunction.NONE, false);
verifyAddReplaceRemoveSegment(HashFunction.MD5, false);
verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, false);
@@ -69,7 +71,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, true);
}
- private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot) {
+ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot)
+ throws IOException {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
"timeCol", hashFunction, null, false, mock(ServerMetrics.class));
@@ -196,6 +199,19 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Remove new segment1, should be no-op
+ upsertMetadataManager.removeSegment(newSegment1);
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
}
private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) {
@@ -274,13 +290,15 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
}
@Test
- public void testAddRecord() {
+ public void testAddRecord()
+ throws IOException {
verifyAddRecord(HashFunction.NONE);
verifyAddRecord(HashFunction.MD5);
verifyAddRecord(HashFunction.MURMUR3);
}
- private void verifyAddRecord(HashFunction hashFunction) {
+ private void verifyAddRecord(HashFunction hashFunction)
+ throws IOException {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
"timeCol", hashFunction, null, false, mock(ServerMetrics.class));
@@ -339,6 +357,23 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3});
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Add record should be no-op
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 4, new IntWrapper(120)));
+ // segment1: 1 -> {1, 120}
+ // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3});
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org