You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xb...@apache.org on 2023/12/06 22:54:16 UTC
(pinot) branch master updated: need to synchronize replacing upsert segment (#12105)
This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3811240629 need to synchronize replacing upsert segment (#12105)
3811240629 is described below
commit 38112406291abc2f7b9612052fc6b442f8f5ea29
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Dec 6 14:54:10 2023 -0800
need to synchronize replacing upsert segment (#12105)
* need to synchronize replacing upsert segment, because segment replacing may happen in two threads concurrently but the replacing logic is not thread safe
---
.../manager/realtime/RealtimeTableDataManager.java | 44 +++++++++++++++-------
.../upsert/BasePartitionUpsertMetadataManager.java | 44 ++++++++++++----------
...rrentMapPartitionUpsertMetadataManagerTest.java | 8 ++--
3 files changed, 59 insertions(+), 37 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 5aa08dd61f..ffe0d47b46 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -31,6 +31,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
@@ -61,6 +62,7 @@ import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.segment.local.utils.SchemaUtils;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -528,22 +530,36 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment);
// Register the new segment after it is fully initialized by partitionUpsertMetadataManager, e.g. to fill up its
// validDocId bitmap. Otherwise, the query can return wrong results, if accessing the premature segment.
- SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName);
- if (oldSegmentManager == null) {
- if (_tableUpsertMetadataManager.isPreloading()) {
- partitionUpsertMetadataManager.preloadSegment(immutableSegment);
- } else {
+ if (_tableUpsertMetadataManager.isPreloading()) {
+ // Preloading segment happens when creating table manager when server restarts, and segment is ensured to be
+ // preloaded by a single thread, so no need for segmentLock.
+ partitionUpsertMetadataManager.preloadSegment(immutableSegment);
+ registerSegment(segmentName, newSegmentManager);
+ _logger.info("Preloaded immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
+ return;
+ }
+ // Replacing segment may happen in two threads, i.e. the consuming thread that's committing the mutable segment
+ // and a HelixTaskExecutor thread that's bringing segment from ONLINE to CONSUMING when the server finds
+ // consuming thread can't commit the segment in time. Adding segment should be done by a single HelixTaskExecutor
+ // thread but do it with segmentLock as well for simplicity.
+ Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+ segmentLock.lock();
+ try {
+ SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName);
+ if (oldSegmentManager == null) {
partitionUpsertMetadataManager.addSegment(immutableSegment);
+ registerSegment(segmentName, newSegmentManager);
+ _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
+ } else {
+ IndexSegment oldSegment = oldSegmentManager.getSegment();
+ partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment);
+ registerSegment(segmentName, newSegmentManager);
+ _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
+ oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType);
+ releaseSegment(oldSegmentManager);
}
- registerSegment(segmentName, newSegmentManager);
- _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
- } else {
- IndexSegment oldSegment = oldSegmentManager.getSegment();
- partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment);
- registerSegment(segmentName, newSegmentManager);
- _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
- oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType);
- releaseSegment(oldSegmentManager);
+ } finally {
+ segmentLock.unlock();
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 4496041a59..9184da1587 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -292,7 +292,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn)) {
- addSegmentUnsafe(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds), true);
+ doPreloadSegment(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds));
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while preloading segment: %s, table: %s", segmentName, _tableNameWithType),
@@ -308,36 +308,42 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
}
/**
+ * NOTE: no need to get segmentLock to preload segment as callers ensure the segment is processed by a single thread.
* NOTE: We allow passing in validDocIds and queryableDocIds here so that the value can be easily accessed from the
* tests. The passed in bitmaps should always be empty.
*/
@VisibleForTesting
- public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+ void doPreloadSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
- String segmentName = segment.getSegmentName();
- Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
- segmentLock.lock();
- try {
- addSegmentUnsafe(segment, validDocIds, queryableDocIds, recordInfoIterator, false);
- } finally {
- segmentLock.unlock();
- }
- }
-
- @VisibleForTesting
- void addSegmentUnsafe(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
- @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
- boolean isPreloading) {
if (validDocIds == null) {
validDocIds = new ThreadSafeMutableRoaringBitmap();
}
if (queryableDocIds == null && _deleteRecordColumn != null) {
queryableDocIds = new ThreadSafeMutableRoaringBitmap();
}
- if (isPreloading) {
- addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
- } else {
+ addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
+ }
+
+ /**
+ * NOTE: We allow passing in validDocIds and queryableDocIds here so that the value can be easily accessed from the
+ * tests. The passed in bitmaps should always be empty.
+ */
+ @VisibleForTesting
+ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
+ String segmentName = segment.getSegmentName();
+ Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+ segmentLock.lock();
+ try {
+ if (validDocIds == null) {
+ validDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
+ if (queryableDocIds == null && _deleteRecordColumn != null) {
+ queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+ } finally {
+ segmentLock.unlock();
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 8a3949a9eb..35e621245c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -844,8 +844,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ImmutableSegmentImpl segment1 =
mockImmutableSegment(1, validDocIds1, null, getPrimaryKeyList(numRecords, primaryKeys));
// Preloading segment adds the segment without checking for upsert.
- upsertMetadataManager.addSegmentUnsafe(segment1, validDocIds1, null,
- getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
+ upsertMetadataManager.doPreloadSegment(segment1, validDocIds1, null,
+ getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator());
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -861,8 +861,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment2 =
mockImmutableSegment(2, validDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys));
- upsertMetadataManager.addSegmentUnsafe(segment2, validDocIds2, null,
- getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
+ upsertMetadataManager.doPreloadSegment(segment2, validDocIds2, null,
+ getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator());
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 0 -> {0, 1}, 1 -> {1, 2}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org