You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xb...@apache.org on 2023/11/30 22:15:00 UTC

(pinot) branch master updated: no need for segment locks during segment preloading (#12077)

This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d7c76b9fbb no need for segment locks during segment preloading (#12077)
d7c76b9fbb is described below

commit d7c76b9fbb5a659da0da6276c6d9833ba1894e0d
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Thu Nov 30 14:14:54 2023 -0800

    no need for segment locks during segment preloading (#12077)
---
 .../upsert/BasePartitionUpsertMetadataManager.java | 38 +++++++++++-----------
 .../upsert/BaseTableUpsertMetadataManager.java     | 19 +++++------
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  4 +--
 3 files changed, 29 insertions(+), 32 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 0e54d399bf..3e1fd4e178 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -281,7 +281,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
 
     try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
         _comparisonColumns, _deleteRecordColumn)) {
-      addSegment(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds), true);
+      addSegmentUnsafe(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds), true);
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Caught exception while preloading segment: %s, table: %s", segmentName, _tableNameWithType),
@@ -303,33 +303,33 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   @VisibleForTesting
   public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
-    addSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, false);
-  }
-
-  @VisibleForTesting
-  public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
-      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
-      boolean isPreloading) {
     String segmentName = segment.getSegmentName();
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
     try {
-      if (validDocIds == null) {
-        validDocIds = new ThreadSafeMutableRoaringBitmap();
-      }
-      if (queryableDocIds == null && _deleteRecordColumn != null) {
-        queryableDocIds = new ThreadSafeMutableRoaringBitmap();
-      }
-      if (isPreloading) {
-        addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
-      } else {
-        addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
-      }
+      addSegmentUnsafe(segment, validDocIds, queryableDocIds, recordInfoIterator, false);
     } finally {
       segmentLock.unlock();
     }
   }
 
+  @VisibleForTesting
+  void addSegmentUnsafe(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
+      boolean isPreloading) {
+    if (validDocIds == null) {
+      validDocIds = new ThreadSafeMutableRoaringBitmap();
+    }
+    if (queryableDocIds == null && _deleteRecordColumn != null) {
+      queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+    }
+    if (isPreloading) {
+      addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
+    } else {
+      addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+    }
+  }
+
   protected abstract long getNumPrimaryKeys();
 
   protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index bc783480b9..6352bc7fcb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections.CollectionUtils;
@@ -41,7 +40,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
@@ -255,15 +253,14 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
   @VisibleForTesting
   void preloadSegmentWithSnapshot(String segmentName, IndexLoadingConfig indexLoadingConfig,
       SegmentZKMetadata zkMetadata) {
-    // This method might modify the file on disk. Use segment lock to prevent race condition
-    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
-    try {
-      segmentLock.lock();
-      // This method checks segment crc and if it has changed, the segment is not loaded.
-      _tableDataManager.tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata);
-    } finally {
-      segmentLock.unlock();
-    }
+    // This method checks segment crc and if it has changed, the segment is not loaded. It might modify the file on
+    // disk, but we don't need to take the segmentLock, because every segment from the current table is processed by
+    // at most one thread from the preloading thread pool. HelixTaskExecutor task threads about to process segments
+    // from the same table are blocked on ConcurrentHashMap lock as in HelixInstanceDataManager.addRealtimeSegment().
+    // In fact, taking segmentLock during segment preloading phase could cause deadlock when HelixTaskExecutor
+    // threads processing other tables have taken the same segmentLock as decided by the hash of table name and
+    // segment name, i.e. due to hash collision.
+    _tableDataManager.tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata);
   }
 
   private File getValidDocIdsSnapshotFile(String segmentName, String segmentTier) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index b212a8b918..deb783c3e3 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -834,7 +834,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ImmutableSegmentImpl segment1 =
         mockImmutableSegment(1, validDocIds1, null, getPrimaryKeyList(numRecords, primaryKeys));
     // Preloading segment adds the segment without checking for upsert.
-    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+    upsertMetadataManager.addSegmentUnsafe(segment1, validDocIds1, null,
         getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
@@ -851,7 +851,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment2 =
         mockImmutableSegment(2, validDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys));
-    upsertMetadataManager.addSegment(segment2, validDocIds2, null,
+    upsertMetadataManager.addSegmentUnsafe(segment2, validDocIds2, null,
         getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org