You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/22 18:59:18 UTC

[pinot] branch master updated: Take upsert snapshot when creating new consuming segment (#10928)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c92b32711 Take upsert snapshot when creating new consuming segment (#10928)
3c92b32711 is described below

commit 3c92b327114c4534f354e8a8f2755197a6439fd5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Jun 22 11:59:10 2023 -0700

    Take upsert snapshot when creating new consuming segment (#10928)
---
 .../realtime/LLRealtimeSegmentDataManager.java     |  31 ++--
 .../immutable/ImmutableSegmentImpl.java            |   9 +-
 .../upsert/BasePartitionUpsertMetadataManager.java | 158 +++++++++++++++------
 .../upsert/PartitionUpsertMetadataManager.java     |   6 +
 .../ImmutableSegmentImplUpsertSnapshotTest.java    | 149 -------------------
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  17 ++-
 6 files changed, 161 insertions(+), 209 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0e1a70bfc2..99f3c6d802 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -232,6 +232,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // consuming.
   private final AtomicBoolean _acquiredConsumerSemaphore;
   private final ServerMetrics _serverMetrics;
+  private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final BooleanSupplier _isReadyToConsumeData;
   private final MutableSegmentImpl _realtimeSegment;
   private volatile StreamPartitionMsgOffset _currentOffset;
@@ -400,13 +401,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // anymore. Remove the file if it exists.
     removeSegmentFile();
 
-    if (!_isReadyToConsumeData.getAsBoolean()) {
-      do {
-        //noinspection BusyWait
-        Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
-      } while (!_shouldStop && !endCriteriaReached() && !_isReadyToConsumeData.getAsBoolean());
-    }
-
     _numRowsErrored = 0;
     final long idlePipeSleepTimeMillis = 100;
     final long idleTimeoutMillis = _partitionLevelStreamConfig.getIdleTimeoutMillis();
@@ -662,6 +656,27 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !_isReadyToConsumeData.getAsBoolean());
+        }
+
+        // TODO:
+        //   When reaching here, the current consuming segment has already acquired the consumer semaphore, but there is
+        //   no guarantee that the previous consuming segment is already persisted (replaced with immutable segment). It
+        //   can potentially cause the following problems:
+        //   1. The snapshot for the previous consuming segment might not be taken since it is not persisted yet
+        //   2. If the previous consuming segment is dropped but immutable segment is not downloaded and replaced yet,
+        //      it might cause inconsistency (especially for partial upsert because events are not consumed in sequence)
+        //   To address this problem, we should consider releasing the consumer semaphore after the consuming segment is
+        //   persisted.
+        // Take upsert snapshot before starting consuming events
+        if (_partitionUpsertMetadataManager != null) {
+          _partitionUpsertMetadataManager.takeSnapshot();
+        }
+
         while (!_state.isFinal()) {
           if (_state.shouldConsume()) {
             consumeLoop();  // Consume until we reached the end criteria, or we are stopped.
@@ -863,7 +878,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     if (_partitionMetadataProvider == null) {
       createPartitionMetadataProvider("Get Partition Lag State");
     }
-    ;
     return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
   }
 
@@ -1309,6 +1323,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _indexLoadingConfig = indexLoadingConfig;
     _schema = schema;
     _serverMetrics = serverMetrics;
+    _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
     _isReadyToConsumeData = isReadyToConsumeData;
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getServerInstance();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 1e00f85077..66875eb726 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -123,7 +123,7 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
     return null;
   }
 
-  public void persistValidDocIdsSnapshot(MutableRoaringBitmap validDocIds) {
+  public void persistValidDocIdsSnapshot() {
     File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile();
     try {
       if (validDocIdsSnapshotFile.exists()) {
@@ -132,14 +132,15 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
           return;
         }
       }
+      MutableRoaringBitmap validDocIdsSnapshot = _validDocIds.getMutableRoaringBitmap();
       try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(validDocIdsSnapshotFile))) {
-        validDocIds.serialize(dataOutputStream);
+        validDocIdsSnapshot.serialize(dataOutputStream);
       }
       LOGGER.info("Persisted valid doc ids for segment: {} with: {} valid docs", getSegmentName(),
-          validDocIds.getCardinality());
+          validDocIdsSnapshot.getCardinality());
     } catch (Exception e) {
       LOGGER.warn("Caught exception while persisting valid doc ids to snapshot file: {}, skipping",
-          validDocIdsSnapshotFile);
+          validDocIdsSnapshotFile, e);
     }
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 6556ffce8a..1501cd28e6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerGauge;
@@ -61,8 +63,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final ServerMetrics _serverMetrics;
   protected final Logger _logger;
 
-  @VisibleForTesting
-  public final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
+  // Tracks all the segments managed by this manager (excluding EmptySegment)
+  protected final Set<IndexSegment> _trackedSegments = ConcurrentHashMap.newKeySet();
+
+  // NOTE: We do not persist snapshot on the first consuming segment because most segments might not be loaded yet
+  protected volatile boolean _gotFirstConsumingSegment = false;
+  protected final ReadWriteLock _snapshotLock;
 
   protected volatile boolean _stopped = false;
   // Initialize with 1 pending operation to indicate the metadata manager can take more operations
@@ -81,6 +87,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     _hashFunction = hashFunction;
     _partialUpsertHandler = partialUpsertHandler;
     _enableSnapshot = enableSnapshot;
+    _snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
     _serverMetrics = serverMetrics;
     _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
   }
@@ -92,44 +99,51 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
 
   @Override
   public void addSegment(ImmutableSegment segment) {
+    String segmentName = segment.getSegmentName();
     if (_stopped) {
       _logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName());
       return;
     }
+    if (segment instanceof EmptyIndexSegment) {
+      _logger.info("Skip adding empty segment: {}", segmentName);
+      return;
+    }
+    Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+        "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+        _tableNameWithType);
+
+    if (_enableSnapshot) {
+      _snapshotLock.readLock().lock();
+    }
     startOperation();
     try {
-      doAddSegment(segment);
+      doAddSegment((ImmutableSegmentImpl) segment);
+      _trackedSegments.add(segment);
     } finally {
       finishOperation();
+      if (_enableSnapshot) {
+        _snapshotLock.readLock().unlock();
+      }
     }
   }
 
-  protected void doAddSegment(ImmutableSegment segment) {
+  protected void doAddSegment(ImmutableSegmentImpl segment) {
     String segmentName = segment.getSegmentName();
     _logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys());
+    long startTimeMs = System.currentTimeMillis();
 
-    if (segment instanceof EmptyIndexSegment) {
-      _logger.info("Skip adding empty segment: {}", segmentName);
-      return;
-    }
-
-    Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
-        "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
-        _tableNameWithType);
-
-    ImmutableSegmentImpl immutableSegmentImpl = (ImmutableSegmentImpl) segment;
     MutableRoaringBitmap validDocIds;
     if (_enableSnapshot) {
-      validDocIds = immutableSegmentImpl.loadValidDocIdsFromSnapshot();
+      validDocIds = segment.loadValidDocIdsFromSnapshot();
       if (validDocIds != null && validDocIds.isEmpty()) {
         _logger.info("Skip adding segment: {} without valid doc, current primary key count: {}",
             segment.getSegmentName(), getNumPrimaryKeys());
-       immutableSegmentImpl.enableUpsert(this, new ThreadSafeMutableRoaringBitmap());
+        segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap());
         return;
       }
     } else {
       validDocIds = null;
-      immutableSegmentImpl.deleteValidDocIdsSnapshot();
+      segment.deleteValidDocIdsSnapshot();
     }
 
     try (UpsertUtils.RecordInfoReader recordInfoReader = UpsertUtils.makeRecordReader(segment, _primaryKeyColumns,
@@ -141,7 +155,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
         recordInfoIterator =
             UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
       }
-      addSegment(immutableSegmentImpl, null, recordInfoIterator);
+      addSegment(segment, null, recordInfoIterator);
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Caught exception while adding segment: %s, table: %s", segmentName, _tableNameWithType), e);
@@ -152,7 +166,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         numPrimaryKeys);
 
-    _logger.info("Finished adding segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+    _logger.info("Finished adding segment: {} in {}ms, current primary key count: {}", segmentName,
+        System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
   /**
@@ -188,9 +203,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
           segment.getSegmentName());
       return;
     }
+
+    // NOTE: We don't acquire snapshot read lock here because snapshot is always taken before a new consuming segment
+    //       starts consuming, so it won't overlap with this method
+    _gotFirstConsumingSegment = true;
     startOperation();
     try {
       doAddRecord(segment, recordInfo);
+      _trackedSegments.add(segment);
     } finally {
       finishOperation();
     }
@@ -204,11 +224,22 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       _logger.info("Skip replacing segment: {} because metadata manager is already stopped", segment.getSegmentName());
       return;
     }
+
+    if (_enableSnapshot) {
+      _snapshotLock.readLock().lock();
+    }
     startOperation();
     try {
       doReplaceSegment(segment, oldSegment);
+      if (!(segment instanceof EmptyIndexSegment)) {
+        _trackedSegments.add(segment);
+      }
+      _trackedSegments.remove(oldSegment);
     } finally {
       finishOperation();
+      if (_enableSnapshot) {
+        _snapshotLock.readLock().lock();
+      }
     }
   }
 
@@ -219,6 +250,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
         _tableNameWithType, oldSegment.getSegmentName(), segmentName);
     _logger.info("Replacing {} segment: {}, current primary key count: {}",
         oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
+    long startTimeMs = System.currentTimeMillis();
 
     if (segment instanceof EmptyIndexSegment) {
       _logger.info("Skip adding empty segment: {}", segmentName);
@@ -241,7 +273,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         numPrimaryKeys);
 
-    _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+    _logger.info("Finished replacing segment: {} in {}ms, current primary key count: {}", segmentName,
+        System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
   /**
@@ -288,10 +321,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     } finally {
       segmentLock.unlock();
     }
-
-    if (!(oldSegment instanceof EmptyIndexSegment)) {
-      _replacedSegments.add(oldSegment);
-    }
   }
 
   protected abstract void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds);
@@ -299,29 +328,27 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   @Override
   public void removeSegment(IndexSegment segment) {
     String segmentName = segment.getSegmentName();
-    if (_replacedSegments.remove(segment)) {
-      _logger.info("Skip removing replaced segment: {}", segmentName);
+    if (_stopped) {
+      _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName);
       return;
     }
-    // Allow persisting valid doc ids snapshot after metadata manager is stopped
-    MutableRoaringBitmap validDocIds =
-        segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
-    if (_enableSnapshot && segment instanceof ImmutableSegmentImpl) {
-      ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds);
-    }
-    if (validDocIds == null || validDocIds.isEmpty()) {
-      _logger.info("Skip removing segment without valid docs: {}", segmentName);
+    if (!_trackedSegments.contains(segment)) {
+      _logger.info("Skip removing untracked (replaced or empty) segment: {}", segmentName);
       return;
     }
-    if (_stopped) {
-      _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName);
-      return;
+
+    if (_enableSnapshot) {
+      _snapshotLock.readLock().lock();
     }
     startOperation();
     try {
       doRemoveSegment(segment);
+      _trackedSegments.remove(segment);
     } finally {
       finishOperation();
+      if (_enableSnapshot) {
+        _snapshotLock.readLock().unlock();
+      }
     }
   }
 
@@ -329,13 +356,18 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     String segmentName = segment.getSegmentName();
     _logger.info("Removing {} segment: {}, current primary key count: {}",
         segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
+    long startTimeMs = System.currentTimeMillis();
+
+    MutableRoaringBitmap validDocIds =
+        segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
+    if (validDocIds == null || validDocIds.isEmpty()) {
+      _logger.info("Skip removing segment without valid docs: {}", segmentName);
+      return;
+    }
 
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
     try {
-      assert segment.getValidDocIds() != null;
-      MutableRoaringBitmap validDocIds = segment.getValidDocIds().getMutableRoaringBitmap();
-      assert validDocIds != null;
       _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
       removeSegment(segment, validDocIds);
     } finally {
@@ -347,7 +379,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         numPrimaryKeys);
 
-    _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+    _logger.info("Finished removing segment: {} in {}ms, current primary key count: {}", segmentName,
+        System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
   @Override
@@ -360,6 +393,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       _logger.debug("Skip updating record because metadata manager is already stopped");
       return record;
     }
+
     startOperation();
     try {
       return doUpdateRecord(record, recordInfo);
@@ -385,6 +419,48 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     }
   }
 
+  @Override
+  public void takeSnapshot() {
+    if (!_enableSnapshot) {
+      return;
+    }
+    if (_stopped) {
+      _logger.info("Skip taking snapshot because metadata manager is already stopped");
+      return;
+    }
+    if (!_gotFirstConsumingSegment) {
+      _logger.info("Skip taking snapshot before getting the first consuming segment");
+      return;
+    }
+
+    _snapshotLock.writeLock().lock();
+    startOperation();
+    try {
+      doTakeSnapshot();
+    } finally {
+      finishOperation();
+      _snapshotLock.writeLock().unlock();
+    }
+  }
+
+  // TODO: Consider optimizing it by tracking and persisting only the changed snapshot
+  protected void doTakeSnapshot() {
+    int numTrackedSegments = _trackedSegments.size();
+    _logger.info("Taking snapshot for {} segments", numTrackedSegments);
+    long startTimeMs = System.currentTimeMillis();
+
+    int numImmutableSegments = 0;
+    for (IndexSegment segment : _trackedSegments) {
+      if (segment instanceof ImmutableSegmentImpl) {
+        ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot();
+        numImmutableSegments++;
+      }
+    }
+
+    _logger.info("Finished taking snapshot for {} immutable segments (out of {} total segments) in {}ms",
+        numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
+  }
+
   protected void startOperation() {
     _numPendingOperations.getAndIncrement();
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index dd07143fd3..55cd8497cb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -84,6 +84,12 @@ public interface PartitionUpsertMetadataManager extends Closeable {
    */
   GenericRow updateRecord(GenericRow record, RecordInfo recordInfo);
 
+  /**
+   * Takes snapshot for all the tracked immutable segments when snapshot is enabled. This method should be invoked
+   * before a new consuming segment starts consuming.
+   */
+  void takeSnapshot();
+
   /**
    * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
    */
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
deleted file mode 100644
index 4bb049cc2c..0000000000
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.indexsegment.immutable;
-
-import java.io.File;
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager;
-import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
-import org.apache.pinot.segment.spi.V1Constants;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
-import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
-import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
-import org.apache.pinot.spi.config.table.HashFunction;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.config.table.UpsertConfig;
-import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.mockito.Mockito;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
-
-public class ImmutableSegmentImplUpsertSnapshotTest {
-  private static final String AVRO_FILE = "data/test_data-mv.avro";
-  private static final String SCHEMA = "data/testDataMVSchema.json";
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ImmutableSegmentImplTest");
-
-  private SegmentDirectory _segmentDirectory;
-  private SegmentMetadataImpl _segmentMetadata;
-  private PinotConfiguration _configuration;
-  private TableConfig _tableConfig;
-  private Schema _schema;
-
-  private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
-  private ImmutableSegmentImpl _immutableSegmentImpl;
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    FileUtils.deleteQuietly(INDEX_DIR);
-
-    Map<String, Object> props = new HashMap<>();
-    props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString());
-    _configuration = new PinotConfiguration(props);
-
-    _segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(INDEX_DIR.toURI(),
-        new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
-
-    URL resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(AVRO_FILE);
-    Assert.assertNotNull(resourceUrl);
-    File avroFile = new File(resourceUrl.getFile());
-
-    IngestionConfig ingestionConfig = new IngestionConfig();
-    ingestionConfig.setRowTimeValueCheck(false);
-    ingestionConfig.setSegmentTimeValueCheck(false);
-
-    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
-    upsertConfig.setEnableSnapshot(true);
-
-    _tableConfig =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
-            .setIngestionConfig(ingestionConfig).setUpsertConfig(upsertConfig).build();
-
-    resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(SCHEMA);
-    _schema = Schema.fromFile(new File(resourceUrl.getFile()));
-
-    SegmentGeneratorConfig config =
-        SegmentTestUtils.getSegmentGeneratorConfigWithSchema(avroFile, INDEX_DIR, "testTable", _tableConfig, _schema);
-
-    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
-    driver.init(config);
-    driver.build();
-    _segmentMetadata = Mockito.mock(SegmentMetadataImpl.class);
-    Mockito.when(_segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap<>());
-    Mockito.when(_segmentMetadata.getIndexDir()).thenReturn(INDEX_DIR);
-    _immutableSegmentImpl = new ImmutableSegmentImpl(_segmentDirectory, _segmentMetadata, new HashMap<>(), null);
-
-    ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
-    _partitionUpsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
-            Collections.singletonList("daysSinceEpoch"), HashFunction.NONE, null, true, serverMetrics);
-
-    _immutableSegmentImpl.enableUpsert(_partitionUpsertMetadataManager, new ThreadSafeMutableRoaringBitmap());
-  }
-
-  @Test
-  public void testPersistValidDocIdsSnapshot() {
-    int[] docIds1 = new int[]{1, 4, 6, 10, 15, 17, 18, 20};
-    MutableRoaringBitmap validDocIds = new MutableRoaringBitmap();
-    validDocIds.add(docIds1);
-
-    _immutableSegmentImpl.persistValidDocIdsSnapshot(validDocIds);
-    assertTrue(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()),
-        V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists());
-
-    MutableRoaringBitmap bitmap = _immutableSegmentImpl.loadValidDocIdsFromSnapshot();
-    assertEquals(bitmap.toArray(), docIds1);
-
-    _immutableSegmentImpl.deleteValidDocIdsSnapshot();
-    assertFalse(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()),
-        V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists());
-  }
-
-  @AfterMethod
-  public void tearDown() {
-    FileUtils.deleteQuietly(INDEX_DIR);
-  }
-}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 1e3f130059..a89f7e9d64 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
@@ -53,7 +54,6 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertSame;
-import static org.testng.Assert.assertTrue;
 
 
 public class ConcurrentMapPartitionUpsertMetadataManagerTest {
@@ -78,6 +78,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
             Collections.singletonList(comparisonColumn), hashFunction, null, false, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+    Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
 
     // Add the first segment
     int numRecords = 6;
@@ -99,6 +100,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
       recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps);
     }
     upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator());
+    trackedSegments.add(segment1);
     // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
     assertEquals(recordLocationMap.size(), 3);
     checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
@@ -125,6 +127,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
       recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps);
     }
     upsertMetadataManager.addSegment(segment2, validDocIds2, recordInfoList2.iterator());
+    trackedSegments.add(segment2);
 
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
@@ -153,6 +156,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap newValidDocIds1 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, primaryKeys1);
     upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, recordInfoList1.iterator(), segment1);
+    trackedSegments.add(newSegment1);
+    trackedSegments.remove(segment1);
     // original segment1: 1 -> {4, 120} (not in the map)
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
@@ -164,7 +169,6 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
-    assertEquals(upsertMetadataManager._replacedSegments, Collections.singleton(segment1));
 
     // Remove the original segment1
     upsertMetadataManager.removeSegment(segment1);
@@ -178,7 +182,6 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
-    assertTrue(upsertMetadataManager._replacedSegments.isEmpty());
 
     // Remove the empty segment
     upsertMetadataManager.removeSegment(emptySegment);
@@ -200,6 +203,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(trackedSegments, Collections.singleton(newSegment1));
 
     // Stop the metadata manager
     upsertMetadataManager.stop();
@@ -210,6 +214,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(recordLocationMap.size(), 1);
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(trackedSegments, Collections.singleton(newSegment1));
 
     // Close the metadata manager
     upsertMetadataManager.close();
@@ -218,8 +223,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) {
     List<RecordInfo> recordInfoList = new ArrayList<>();
     for (int i = 0; i < numRecords; i++) {
-      recordInfoList.add(
-          new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i])));
+      recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i])));
     }
     return recordInfoList;
   }
@@ -288,8 +292,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertNotNull(recordLocation);
     assertSame(recordLocation.getSegment(), segment);
     assertEquals(recordLocation.getDocId(), docId);
-    assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value,
-        comparisonValue);
+    assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, comparisonValue);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org