You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/07/29 20:29:51 UTC
[pinot] branch master updated: Fix upsert replace (#9132)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 91289be930 Fix upsert replace (#9132)
91289be930 is described below
commit 91289be930754dee74776e804ce388d005b06e3b
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jul 29 13:29:44 2022 -0700
Fix upsert replace (#9132)
- Fix upsert replace to
- Correctly replace record location without removing the valid docs from the replaced segment
- Correctly track and remove the remaining valid docs from the replaced segment
- Track replaced segment so that the docs do not need to be removed again
- Use segment lock to prevent adding/replacing/removing the same segment at same time which can cause race condition
- Track an unexpected case where the primary keys show up in the wrong segment using meter `UPSERT_KEYS_IN_WRONG_SEGMENT`
---
.../apache/pinot/common/metrics/ServerMeter.java | 3 +-
.../manager/realtime/RealtimeTableDataManager.java | 7 +-
.../upsert/PartitionUpsertMetadataManager.java | 191 ++++++++++++++++-----
.../pinot/segment/local/utils}/SegmentLocks.java | 2 +-
.../upsert/PartitionUpsertMetadataManagerTest.java | 93 ++++------
.../starter/helix/HelixInstanceDataManager.java | 1 +
.../SegmentOnlineOfflineStateModelFactory.java | 1 +
7 files changed, 188 insertions(+), 110 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 22bb82abd4..c0d173fc76 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -41,8 +41,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
REALTIME_PARTITION_MISMATCH("mismatch", false),
REALTIME_DEDUP_DROPPED("rows", false),
+ UPSERT_KEYS_IN_WRONG_SEGMENT("rows", false),
PARTIAL_UPSERT_OUT_OF_ORDER("rows", false),
- PARTIAL_UPSERT_ROWS_NOT_REPLACED("rows", false),
+ PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false),
ROWS_WITH_ERRORS("rows", false),
LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true),
LLC_CONTROLLER_RESPONSE_COMMIT("messages", true),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 654133af11..0a09ff4087 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -430,12 +430,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
_logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
} else {
IndexSegment oldSegment = oldSegmentManager.getSegment();
- partitionUpsertMetadataManager.addSegment(immutableSegment);
- // TODO: Fix the following issue about replacing segment in upsert metadata
- // - We cannot directly invalidate the docs in the replaced segment because query might still running against it
- // - We should track the valid docs in the replaced segment separately. Currently the docs won't be invalidate
- // in the replaced segment due to the reason above, and will cause wrong logs/metrics emitted.
- // partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment);
+ partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment);
_logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType);
releaseSegment(oldSegmentManager);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 39455c569e..d1042fe5c1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -23,9 +23,12 @@ import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
@@ -36,6 +39,7 @@ import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.local.utils.RecordInfo;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
@@ -92,6 +96,9 @@ public class PartitionUpsertMetadataManager {
@VisibleForTesting
final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
+
// Reused for reading previous record during partial upsert
private final GenericRow _reuse = new GenericRow();
@@ -122,6 +129,12 @@ public class PartitionUpsertMetadataManager {
* Initializes the upsert metadata for the given immutable segment.
*/
public void addSegment(ImmutableSegment segment) {
+ addSegment(segment, null, null);
+ }
+
+ @VisibleForTesting
+ void addSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable Iterator<RecordInfo> recordInfoIterator) {
String segmentName = segment.getSegmentName();
_logger.info("Adding segment: {}, current primary key count: {}", segmentName,
_primaryKeyToRecordLocationMap.size());
@@ -131,10 +144,22 @@ public class PartitionUpsertMetadataManager {
return;
}
- Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
- "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
- _tableNameWithType);
- addSegment((ImmutableSegmentImpl) segment, new ThreadSafeMutableRoaringBitmap(), getRecordInfoIterator(segment));
+ Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+ segmentLock.lock();
+ try {
+ Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+ "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+ _tableNameWithType);
+ if (validDocIds == null) {
+ validDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
+ if (recordInfoIterator == null) {
+ recordInfoIterator = getRecordInfoIterator(segment);
+ }
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, null, null);
+ } finally {
+ segmentLock.unlock();
+ }
// Update metrics
int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
@@ -180,11 +205,13 @@ public class PartitionUpsertMetadataManager {
}
}
- @VisibleForTesting
- void addSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
- Iterator<RecordInfo> recordInfoIterator) {
+ private void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
+ Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
+ @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
String segmentName = segment.getSegmentName();
segment.enableUpsert(this, validDocIds);
+
+ AtomicInteger numKeysInWrongSegment = new AtomicInteger();
while (recordInfoIterator.hasNext()) {
RecordInfo recordInfo = recordInfoIterator.next();
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
@@ -198,7 +225,7 @@ public class PartitionUpsertMetadataManager {
// The current record is in the same segment
// Update the record location when there is a tie to keep the newer record. Note that the record info
// iterator will return records with incremental doc ids.
- if (segment == currentSegment) {
+ if (currentSegment == segment) {
if (comparisonResult >= 0) {
validDocIds.replace(currentRecordLocation.getDocId(), recordInfo.getDocId());
return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
@@ -211,9 +238,25 @@ public class PartitionUpsertMetadataManager {
// This could happen when committing a consuming segment, or reloading a completed segment. In this
// case, we want to update the record location when there is a tie because the record locations should
// point to the new added segment instead of the old segment being replaced. Also, do not update the valid
- // doc ids for the old segment because it has not been replaced yet.
+ // doc ids for the old segment because it has not been replaced yet. We pass in an optional valid doc ids
+ // snapshot for the old segment, which can be updated and used to track the docs not replaced yet.
+ if (currentSegment == oldSegment) {
+ if (comparisonResult >= 0) {
+ validDocIds.add(recordInfo.getDocId());
+ if (validDocIdsForOldSegment != null) {
+ validDocIdsForOldSegment.remove(currentRecordLocation.getDocId());
+ }
+ return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+ } else {
+ return currentRecordLocation;
+ }
+ }
+
+ // This should not happen because the previously replaced segment should have all keys removed. We still
+ // handle it here, and also track the number of keys not properly replaced previously.
String currentSegmentName = currentSegment.getSegmentName();
- if (segmentName.equals(currentSegmentName)) {
+ if (currentSegmentName.equals(segmentName)) {
+ numKeysInWrongSegment.getAndIncrement();
if (comparisonResult >= 0) {
validDocIds.add(recordInfo.getDocId());
return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
@@ -243,6 +286,11 @@ public class PartitionUpsertMetadataManager {
}
});
}
+ int numKeys = numKeysInWrongSegment.get();
+ if (numKeys > 0) {
+ _logger.warn("Found {} primary keys in the wrong segment when adding segment: {}", numKeys, segmentName);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.UPSERT_KEYS_IN_WRONG_SEGMENT, numKeys);
+ }
}
/**
@@ -285,36 +333,73 @@ public class PartitionUpsertMetadataManager {
/**
* Replaces the upsert metadata for the old segment with the new immutable segment.
*/
- public void replaceSegment(ImmutableSegment newSegment, IndexSegment oldSegment) {
- String segmentName = newSegment.getSegmentName();
+ public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
+ replaceSegment(segment, null, null, oldSegment);
+ }
+
+ @VisibleForTesting
+ void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
+ String segmentName = segment.getSegmentName();
Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()),
"Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}",
_tableNameWithType, oldSegment.getSegmentName(), segmentName);
- _logger.info("Replacing {} segment: {}", oldSegment instanceof ImmutableSegment ? "immutable" : "mutable",
- segmentName);
-
- addSegment(newSegment);
-
- MutableRoaringBitmap validDocIds =
- oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
- if (validDocIds != null && !validDocIds.isEmpty()) {
- int numDocsNotReplaced = validDocIds.getCardinality();
- if (_partialUpsertHandler != null) {
- // For partial-upsert table, because we do not restore the original record location when removing the primary
- // keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a consuming
- // segment is replaced by a committed segment that is consumed from a different server with different records
- // (some stream consumer cannot guarantee consuming the messages in the same order).
- _logger.error("{} primary keys not replaced when replacing segment: {} for partial-upsert table. This can "
- + "potentially cause inconsistency between replicas", numDocsNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_ROWS_NOT_REPLACED,
- numDocsNotReplaced);
+ _logger.info("Replacing {} segment: {}, current primary key count: {}",
+ oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName,
+ _primaryKeyToRecordLocationMap.size());
+
+ Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+ segmentLock.lock();
+ try {
+ MutableRoaringBitmap validDocIdsForOldSegment =
+ oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+ if (segment instanceof EmptyIndexSegment) {
+ _logger.info("Skip adding empty segment: {}", segmentName);
} else {
- _logger.info("{} primary keys not replaced when replacing segment: {}", numDocsNotReplaced, segmentName);
+ Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+ "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+ _tableNameWithType);
+ if (validDocIds == null) {
+ validDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
+ if (recordInfoIterator == null) {
+ recordInfoIterator = getRecordInfoIterator(segment);
+ }
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, oldSegment,
+ validDocIdsForOldSegment);
}
- removeSegment(oldSegment);
+
+ if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) {
+ int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+ if (_partialUpsertHandler != null) {
+ // For partial-upsert table, because we do not restore the original record location when removing the primary
+ // keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a
+ // consuming segment is replaced by a committed segment that is consumed from a different server with
+ // different records (some stream consumer cannot guarantee consuming the messages in the same order).
+ _logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This "
+ + "can potentially cause inconsistency between replicas", numKeysNotReplaced, segmentName);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
+ numKeysNotReplaced);
+ } else {
+ _logger.info("Found {} primary keys not replaced when replacing segment: {}", numKeysNotReplaced,
+ segmentName);
+ }
+ removeSegment(oldSegment, validDocIdsForOldSegment);
+ }
+ } finally {
+ segmentLock.unlock();
+ }
+
+ if (!(oldSegment instanceof EmptyIndexSegment)) {
+ _replacedSegments.add(oldSegment);
}
- _logger.info("Finished replacing segment: {}", segmentName);
+ // Update metrics
+ int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
+ _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+ numPrimaryKeys);
+
+ _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
}
/**
@@ -326,14 +411,37 @@ public class PartitionUpsertMetadataManager {
segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName,
_primaryKeyToRecordLocationMap.size());
- MutableRoaringBitmap validDocIds =
- segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
- if (validDocIds == null || validDocIds.isEmpty()) {
- _logger.info("Skip removing segment without valid docs: {}", segmentName);
+ if (_replacedSegments.remove(segment)) {
+ _logger.info("Skip removing replaced segment: {}", segmentName);
return;
}
- _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
+ Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+ segmentLock.lock();
+ try {
+ MutableRoaringBitmap validDocIds =
+ segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
+ if (validDocIds == null || validDocIds.isEmpty()) {
+ _logger.info("Skip removing segment without valid docs: {}", segmentName);
+ return;
+ }
+
+ _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
+ removeSegment(segment, validDocIds);
+ } finally {
+ segmentLock.unlock();
+ }
+
+ // Update metrics
+ int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
+ _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+ numPrimaryKeys);
+
+ _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+ }
+
+ private void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
+ assert !validDocIds.isEmpty();
PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]);
PeekableIntIterator iterator = validDocIds.getIntIterator();
while (iterator.hasNext()) {
@@ -347,13 +455,6 @@ public class PartitionUpsertMetadataManager {
return recordLocation;
});
}
-
- // Update metrics
- int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- numPrimaryKeys);
-
- _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
}
/**
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentLocks.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java
similarity index 96%
rename from pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentLocks.java
rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java
index 0a14d05a7b..abbd2c906c 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentLocks.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.server.starter.helix;
+package org.apache.pinot.segment.local.utils;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
index 639f6321e4..aa1392d6df 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
@@ -45,8 +45,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
public class PartitionUpsertMetadataManagerTest {
@@ -54,13 +54,13 @@ public class PartitionUpsertMetadataManagerTest {
private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
@Test
- public void testAddSegment() {
- verifyAddSegment(HashFunction.NONE);
- verifyAddSegment(HashFunction.MD5);
- verifyAddSegment(HashFunction.MURMUR3);
+ public void testAddReplaceRemoveSegment() {
+ verifyAddReplaceRemoveSegment(HashFunction.NONE);
+ verifyAddReplaceRemoveSegment(HashFunction.MD5);
+ verifyAddReplaceRemoveSegment(HashFunction.MURMUR3);
}
- private void verifyAddSegment(HashFunction hashFunction) {
+ private void verifyAddReplaceRemoveSegment(HashFunction hashFunction) {
PartitionUpsertMetadataManager upsertMetadataManager =
new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), "timeCol",
hashFunction, null, mock(ServerMetrics.class));
@@ -76,6 +76,7 @@ public class PartitionUpsertMetadataManagerTest {
List<RecordInfo> recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps);
upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator());
// segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ assertEquals(recordLocationMap.size(), 3);
checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
@@ -91,6 +92,7 @@ public class PartitionUpsertMetadataManagerTest {
getRecordInfoList(numRecords, primaryKeys, timestamps).iterator());
// segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ assertEquals(recordLocationMap.size(), 4);
checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -99,9 +101,11 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
// Add an empty segment
- upsertMetadataManager.addSegment(new EmptyIndexSegment(mock(SegmentMetadataImpl.class)));
+ EmptyIndexSegment emptySegment = mockEmptySegment(3);
+ upsertMetadataManager.addSegment(emptySegment);
// segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ assertEquals(recordLocationMap.size(), 4);
checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -112,10 +116,11 @@ public class PartitionUpsertMetadataManagerTest {
// Replace (reload) the first segment
ThreadSafeMutableRoaringBitmap newValidDocIds1 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, primaryKeys1);
- upsertMetadataManager.addSegment(newSegment1, newValidDocIds1, recordInfoList1.iterator());
- // original segment1: 1 -> {4, 120}
+ upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, recordInfoList1.iterator(), segment1);
+ // original segment1: 1 -> {4, 120} (not in the map)
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
@@ -123,34 +128,42 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
- assertSame(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(1), hashFunction)).getSegment(),
- newSegment1);
+ assertEquals(upsertMetadataManager._replacedSegments, Collections.singleton(segment1));
// Remove the original segment1
upsertMetadataManager.removeSegment(segment1);
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
- assertSame(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(1), hashFunction)).getSegment(),
- newSegment1);
+ assertTrue(upsertMetadataManager._replacedSegments.isEmpty());
- // Remove an empty segment
- upsertMetadataManager.removeSegment(new EmptyIndexSegment(mock(SegmentMetadataImpl.class)));
+ // Remove the empty segment
+ upsertMetadataManager.removeSegment(emptySegment);
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
- assertSame(recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(1), hashFunction)).getSegment(),
- newSegment1);
+
+ // Remove segment2
+ upsertMetadataManager.removeSegment(segment2);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
}
private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) {
@@ -179,6 +192,12 @@ public class PartitionUpsertMetadataManagerTest {
return segment;
}
+ private static EmptyIndexSegment mockEmptySegment(int sequenceNumber) {
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ when(segmentMetadata.getName()).thenReturn(getSegmentName(sequenceNumber));
+ return new EmptyIndexSegment(segmentMetadata);
+ }
+
private static MutableSegment mockMutableSegment(int sequenceNumber, ThreadSafeMutableRoaringBitmap validDocIds) {
MutableSegment segment = mock(MutableSegment.class);
when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
@@ -272,46 +291,6 @@ public class PartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3});
}
- @Test
- public void testRemoveSegment() {
- verifyRemoveSegment(HashFunction.NONE);
- verifyRemoveSegment(HashFunction.MD5);
- verifyRemoveSegment(HashFunction.MURMUR3);
- }
-
- private void verifyRemoveSegment(HashFunction hashFunction) {
- PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"), "timeCol",
- hashFunction, null, mock(ServerMetrics.class));
- Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
-
- // Add 2 segments
- // segment1: 0 -> {0, 100}, 1 -> {1, 100}
- // segment2: 2 -> {0, 100}, 3 -> {0, 100}
- int numRecords = 2;
- int[] primaryKeys = new int[]{0, 1};
- int[] timestamps = new int[]{100, 100};
- ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
- ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
- upsertMetadataManager.addSegment(segment1, validDocIds1,
- getRecordInfoList(numRecords, primaryKeys, timestamps).iterator());
-
- primaryKeys = new int[]{2, 3};
- ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
- ImmutableSegmentImpl segment2 = mockImmutableSegment(2, validDocIds2, getPrimaryKeyList(numRecords, primaryKeys));
- upsertMetadataManager.addSegment(segment2, validDocIds2,
- getRecordInfoList(numRecords, primaryKeys, timestamps).iterator());
-
- // Remove the first segment
- upsertMetadataManager.removeSegment(segment1);
- // segment2: 2 -> {0, 100}, 3 -> {0, 100}
- assertNull(recordLocationMap.get(makePrimaryKey(0)));
- assertNull(recordLocationMap.get(makePrimaryKey(1)));
- checkRecordLocation(recordLocationMap, 2, segment2, 0, 100, hashFunction);
- checkRecordLocation(recordLocationMap, 3, segment2, 1, 100, hashFunction);
- assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
- }
-
@Test
public void testHashPrimaryKey() {
PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 9904b95cb0..132fe44fe9 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -59,6 +59,7 @@ import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index d2e4c30ca6..9e541a29f2 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org