You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xb...@apache.org on 2023/11/30 22:15:00 UTC
(pinot) branch master updated: no need for segment locks during segment preloading (#12077)
This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d7c76b9fbb no need for segment locks during segment preloading (#12077)
d7c76b9fbb is described below
commit d7c76b9fbb5a659da0da6276c6d9833ba1894e0d
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Thu Nov 30 14:14:54 2023 -0800
no need for segment locks during segment preloading (#12077)
---
.../upsert/BasePartitionUpsertMetadataManager.java | 38 +++++++++++-----------
.../upsert/BaseTableUpsertMetadataManager.java | 19 +++++------
...rrentMapPartitionUpsertMetadataManagerTest.java | 4 +--
3 files changed, 29 insertions(+), 32 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 0e54d399bf..3e1fd4e178 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -281,7 +281,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn)) {
- addSegment(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds), true);
+ addSegmentUnsafe(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds), true);
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while preloading segment: %s, table: %s", segmentName, _tableNameWithType),
@@ -303,33 +303,33 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
@VisibleForTesting
public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
- addSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, false);
- }
-
- @VisibleForTesting
- public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
- @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
- boolean isPreloading) {
String segmentName = segment.getSegmentName();
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
segmentLock.lock();
try {
- if (validDocIds == null) {
- validDocIds = new ThreadSafeMutableRoaringBitmap();
- }
- if (queryableDocIds == null && _deleteRecordColumn != null) {
- queryableDocIds = new ThreadSafeMutableRoaringBitmap();
- }
- if (isPreloading) {
- addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
- } else {
- addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
- }
+ addSegmentUnsafe(segment, validDocIds, queryableDocIds, recordInfoIterator, false);
} finally {
segmentLock.unlock();
}
}
+ @VisibleForTesting
+ void addSegmentUnsafe(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
+ boolean isPreloading) {
+ if (validDocIds == null) {
+ validDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
+ if (queryableDocIds == null && _deleteRecordColumn != null) {
+ queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
+ if (isPreloading) {
+ addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
+ } else {
+ addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+ }
+ }
+
protected abstract long getNumPrimaryKeys();
protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index bc783480b9..6352bc7fcb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
@@ -41,7 +40,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
@@ -255,15 +253,14 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
@VisibleForTesting
void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig indexLoadingConfig,
SegmentZKMetadata zkMetadata) {
- // This method might modify the file on disk. Use segment lock to prevent race condition
- Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
- try {
- segmentLock.lock();
- // This method checks segment crc and if it has changed, the segment is not loaded.
- _tableDataManager.tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata);
- } finally {
- segmentLock.unlock();
- }
+ // This method checks segment crc and if it has changed, the segment is not loaded. It might modify the file on
+ // disk, but we don't need to take the segmentLock, because every segment from the current table is processed by
+ // at most one thread from the preloading thread pool. HelixTaskExecutor task threads about to process segments
+ // from the same table are blocked on ConcurrentHashMap lock as in HelixInstanceDataManager.addRealtimeSegment().
+ // In fact, taking segmentLock during segment preloading phase could cause deadlock when HelixTaskExecutor
+ // threads processing other tables have taken the same segmentLock as decided by the hash of table name and
+ // segment name, i.e. due to hash collision.
+ _tableDataManager.tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata);
}
private File getValidDocIdsSnapshotFile(String segmentName, String segmentTier) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index b212a8b918..deb783c3e3 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -834,7 +834,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ImmutableSegmentImpl segment1 =
mockImmutableSegment(1, validDocIds1, null, getPrimaryKeyList(numRecords, primaryKeys));
// Preloading segment adds the segment without checking for upsert.
- upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ upsertMetadataManager.addSegmentUnsafe(segment1, validDocIds1, null,
getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
@@ -851,7 +851,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment2 =
mockImmutableSegment(2, validDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys));
- upsertMetadataManager.addSegment(segment2, validDocIds2, null,
+ upsertMetadataManager.addSegmentUnsafe(segment2, validDocIds2, null,
getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org