You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xb...@apache.org on 2023/12/06 22:54:16 UTC

(pinot) branch master updated: need to synchronize replacing upsert segment (#12105)

This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3811240629 need to synchronize replacing upsert segment (#12105)
3811240629 is described below

commit 38112406291abc2f7b9612052fc6b442f8f5ea29
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Dec 6 14:54:10 2023 -0800

    need to synchronize replacing upsert segment (#12105)
    
    * need to synchronize replacing upsert segment, because segment replacing may happen in two threads concurrently but the replacing logic is not thread safe
---
 .../manager/realtime/RealtimeTableDataManager.java | 44 +++++++++++++++-------
 .../upsert/BasePartitionUpsertMetadataManager.java | 44 ++++++++++++----------
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  8 ++--
 3 files changed, 59 insertions(+), 37 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 5aa08dd61f..ffe0d47b46 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
 import java.util.function.BooleanSupplier;
 import java.util.function.Supplier;
 import javax.annotation.concurrent.ThreadSafe;
@@ -61,6 +62,7 @@ import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
 import org.apache.pinot.segment.local.utils.SchemaUtils;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -528,22 +530,36 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
     ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment);
     // Register the new segment after it is fully initialized by partitionUpsertMetadataManager, e.g. to fill up its
     // validDocId bitmap. Otherwise, the query can return wrong results, if accessing the premature segment.
-    SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName);
-    if (oldSegmentManager == null) {
-      if (_tableUpsertMetadataManager.isPreloading()) {
-        partitionUpsertMetadataManager.preloadSegment(immutableSegment);
-      } else {
+    if (_tableUpsertMetadataManager.isPreloading()) {
+      // Preloading segment happens when creating table manager when server restarts, and segment is ensured to be
+      // preloaded by a single thread, so no need for segmentLock.
+      partitionUpsertMetadataManager.preloadSegment(immutableSegment);
+      registerSegment(segmentName, newSegmentManager);
+      _logger.info("Preloaded immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
+      return;
+    }
+    // Replacing segment may happen in two threads, i.e. the consuming thread that's committing the mutable segment
+    // and a HelixTaskExecutor thread that's bringing segment from ONLINE to CONSUMING when the server finds
+    // consuming thread can't commit the segment in time. Adding segment should be done by a single HelixTaskExecutor
+    // thread but do it with segmentLock as well for simplicity.
+    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+    segmentLock.lock();
+    try {
+      SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName);
+      if (oldSegmentManager == null) {
         partitionUpsertMetadataManager.addSegment(immutableSegment);
+        registerSegment(segmentName, newSegmentManager);
+        _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
+      } else {
+        IndexSegment oldSegment = oldSegmentManager.getSegment();
+        partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment);
+        registerSegment(segmentName, newSegmentManager);
+        _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
+            oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType);
+        releaseSegment(oldSegmentManager);
       }
-      registerSegment(segmentName, newSegmentManager);
-      _logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, _tableNameWithType);
-    } else {
-      IndexSegment oldSegment = oldSegmentManager.getSegment();
-      partitionUpsertMetadataManager.replaceSegment(immutableSegment, oldSegment);
-      registerSegment(segmentName, newSegmentManager);
-      _logger.info("Replaced {} segment: {} of upsert-enabled table: {}",
-          oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, _tableNameWithType);
-      releaseSegment(oldSegmentManager);
+    } finally {
+      segmentLock.unlock();
     }
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 4496041a59..9184da1587 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -292,7 +292,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
 
     try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
         _comparisonColumns, _deleteRecordColumn)) {
-      addSegmentUnsafe(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds), true);
+      doPreloadSegment(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds));
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Caught exception while preloading segment: %s, table: %s", segmentName, _tableNameWithType),
@@ -308,36 +308,42 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   }
 
   /**
+   * NOTE: no need to get segmentLock to preload segment as callers ensure the segment is processed by a single thread.
    * NOTE: We allow passing in validDocIds and queryableDocIds here so that the value can be easily accessed from the
    *       tests. The passed in bitmaps should always be empty.
    */
   @VisibleForTesting
-  public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+  void doPreloadSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
-    String segmentName = segment.getSegmentName();
-    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
-    segmentLock.lock();
-    try {
-      addSegmentUnsafe(segment, validDocIds, queryableDocIds, recordInfoIterator, false);
-    } finally {
-      segmentLock.unlock();
-    }
-  }
-
-  @VisibleForTesting
-  void addSegmentUnsafe(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
-      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
-      boolean isPreloading) {
     if (validDocIds == null) {
       validDocIds = new ThreadSafeMutableRoaringBitmap();
     }
     if (queryableDocIds == null && _deleteRecordColumn != null) {
       queryableDocIds = new ThreadSafeMutableRoaringBitmap();
     }
-    if (isPreloading) {
-      addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
-    } else {
+    addSegmentWithoutUpsert(segment, validDocIds, queryableDocIds, recordInfoIterator);
+  }
+
+  /**
+   * NOTE: We allow passing in validDocIds and queryableDocIds here so that the value can be easily accessed from the
+   *       tests. The passed in bitmaps should always be empty.
+   */
+  @VisibleForTesting
+  public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
+    String segmentName = segment.getSegmentName();
+    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+    segmentLock.lock();
+    try {
+      if (validDocIds == null) {
+        validDocIds = new ThreadSafeMutableRoaringBitmap();
+      }
+      if (queryableDocIds == null && _deleteRecordColumn != null) {
+        queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+      }
       addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
+    } finally {
+      segmentLock.unlock();
     }
   }
 
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 8a3949a9eb..35e621245c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -844,8 +844,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ImmutableSegmentImpl segment1 =
         mockImmutableSegment(1, validDocIds1, null, getPrimaryKeyList(numRecords, primaryKeys));
     // Preloading segment adds the segment without checking for upsert.
-    upsertMetadataManager.addSegmentUnsafe(segment1, validDocIds1, null,
-        getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
+    upsertMetadataManager.doPreloadSegment(segment1, validDocIds1, null,
+        getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator());
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -861,8 +861,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment2 =
         mockImmutableSegment(2, validDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys));
-    upsertMetadataManager.addSegmentUnsafe(segment2, validDocIds2, null,
-        getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator(), true);
+    upsertMetadataManager.doPreloadSegment(segment2, validDocIds2, null,
+        getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator());
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     // segment2: 0 -> {0, 1}, 1 -> {1, 2}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org