You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/09/20 19:44:34 UTC

[pinot] branch master updated: Extract the common logic for upsert metadata manager (#9435)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ab2db0c693 Extract the common logic for upsert metadata manager (#9435)
ab2db0c693 is described below

commit ab2db0c69340214bbee4ca901616696ae396ed87
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Sep 20 12:44:27 2022 -0700

    Extract the common logic for upsert metadata manager (#9435)
---
 .../upsert/BasePartitionUpsertMetadataManager.java | 255 +++++++++++++++++++++
 ...oncurrentMapPartitionUpsertMetadataManager.java | 243 ++------------------
 .../ConcurrentMapTableUpsertMetadataManager.java   |   4 +-
 3 files changed, 283 insertions(+), 219 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
new file mode 100644
index 0000000000..41f6fa9f3c
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@ThreadSafe
+public abstract class BasePartitionUpsertMetadataManager implements PartitionUpsertMetadataManager {
+  protected static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1);
+
+  protected final String _tableNameWithType;
+  protected final int _partitionId;
+  protected final List<String> _primaryKeyColumns;
+  protected final String _comparisonColumn;
+  protected final HashFunction _hashFunction;
+  protected final PartialUpsertHandler _partialUpsertHandler;
+  protected final ServerMetrics _serverMetrics;
+  protected final Logger _logger;
+
+  @VisibleForTesting
+  public final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
+
+  protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
+  protected int _numOutOfOrderEvents = 0;
+
+  protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
+      List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
+      @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) {
+    _tableNameWithType = tableNameWithType;
+    _partitionId = partitionId;
+    _primaryKeyColumns = primaryKeyColumns;
+    _comparisonColumn = comparisonColumn;
+    _hashFunction = hashFunction;
+    _partialUpsertHandler = partialUpsertHandler;
+    _serverMetrics = serverMetrics;
+    _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
+  }
+
+  @Override
+  public List<String> getPrimaryKeyColumns() {
+    return _primaryKeyColumns;
+  }
+
+  @Override
+  public void addSegment(ImmutableSegment segment) {
+    addSegment(segment, null, null);
+  }
+
+  @VisibleForTesting
+  public void addSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable Iterator<RecordInfo> recordInfoIterator) {
+    String segmentName = segment.getSegmentName();
+    _logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys());
+
+    if (segment instanceof EmptyIndexSegment) {
+      _logger.info("Skip adding empty segment: {}", segmentName);
+      return;
+    }
+
+    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+    segmentLock.lock();
+    try {
+      Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+          "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+          _tableNameWithType);
+      if (validDocIds == null) {
+        validDocIds = new ThreadSafeMutableRoaringBitmap();
+      }
+      if (recordInfoIterator == null) {
+        recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn);
+      }
+      addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, null, null);
+    } finally {
+      segmentLock.unlock();
+    }
+
+    // Update metrics
+    long numPrimaryKeys = getNumPrimaryKeys();
+    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+        numPrimaryKeys);
+
+    _logger.info("Finished adding segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+  }
+
+  protected abstract long getNumPrimaryKeys();
+
+  protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
+      Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
+      @Nullable MutableRoaringBitmap validDocIdsForOldSegment);
+
+  @Override
+  public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
+    replaceSegment(segment, null, null, oldSegment);
+  }
+
+  @VisibleForTesting
+  public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
+    String segmentName = segment.getSegmentName();
+    Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()),
+        "Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}",
+        _tableNameWithType, oldSegment.getSegmentName(), segmentName);
+    _logger.info("Replacing {} segment: {}, current primary key count: {}",
+        oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
+
+    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+    segmentLock.lock();
+    try {
+      MutableRoaringBitmap validDocIdsForOldSegment =
+          oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+      if (segment instanceof EmptyIndexSegment) {
+        _logger.info("Skip adding empty segment: {}", segmentName);
+      } else {
+        Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+            "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+            _tableNameWithType);
+        if (validDocIds == null) {
+          validDocIds = new ThreadSafeMutableRoaringBitmap();
+        }
+        if (recordInfoIterator == null) {
+          recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn);
+        }
+        addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, oldSegment,
+            validDocIdsForOldSegment);
+      }
+
+      if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) {
+        int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+        if (_partialUpsertHandler != null) {
+          // For partial-upsert table, because we do not restore the original record location when removing the primary
+          // keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a
+          // consuming segment is replaced by a committed segment that is consumed from a different server with
+          // different records (some stream consumer cannot guarantee consuming the messages in the same order).
+          _logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This "
+              + "can potentially cause inconsistency between replicas", numKeysNotReplaced, segmentName);
+          _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
+              numKeysNotReplaced);
+        } else {
+          _logger.info("Found {} primary keys not replaced when replacing segment: {}", numKeysNotReplaced,
+              segmentName);
+        }
+        removeSegment(oldSegment, validDocIdsForOldSegment);
+      }
+    } finally {
+      segmentLock.unlock();
+    }
+
+    if (!(oldSegment instanceof EmptyIndexSegment)) {
+      _replacedSegments.add(oldSegment);
+    }
+
+    // Update metrics
+    long numPrimaryKeys = getNumPrimaryKeys();
+    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+        numPrimaryKeys);
+
+    _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+  }
+
+  protected abstract void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds);
+
+  @Override
+  public void removeSegment(IndexSegment segment) {
+    String segmentName = segment.getSegmentName();
+    _logger.info("Removing {} segment: {}, current primary key count: {}",
+        segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
+
+    if (_replacedSegments.remove(segment)) {
+      _logger.info("Skip removing replaced segment: {}", segmentName);
+      return;
+    }
+
+    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+    segmentLock.lock();
+    try {
+      MutableRoaringBitmap validDocIds =
+          segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
+      if (validDocIds == null || validDocIds.isEmpty()) {
+        _logger.info("Skip removing segment without valid docs: {}", segmentName);
+        return;
+      }
+
+      _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
+      removeSegment(segment, validDocIds);
+    } finally {
+      segmentLock.unlock();
+    }
+
+    // Update metrics
+    long numPrimaryKeys = getNumPrimaryKeys();
+    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+        numPrimaryKeys);
+
+    _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+  }
+
+  protected void handleOutOfOrderEvent(Object currentComparisonValue, Object recordComparisonValue) {
+    _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
+    _numOutOfOrderEvents++;
+    long currentTimeNs = System.nanoTime();
+    if (currentTimeNs - _lastOutOfOrderEventReportTimeNs > OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
+      _logger.warn("Skipped {} out-of-order events for partial-upsert table (the last event has current comparison "
+              + "value: {}, record comparison value: {})", _numOutOfOrderEvents, currentComparisonValue,
+          recordComparisonValue);
+      _lastOutOfOrderEventReportTimeNs = currentTimeNs;
+      _numOutOfOrderEvents = 0;
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _logger.info("Closing the metadata manager, current primary key count: {}", getNumPrimaryKeys());
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 1654eaca0d..125f61fe47 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -19,27 +19,20 @@
 package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.SegmentLocks;
-import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
@@ -48,8 +41,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -57,92 +48,28 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 @ThreadSafe
-public class ConcurrentMapPartitionUpsertMetadataManager implements PartitionUpsertMetadataManager {
-  private static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1);
+public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUpsertMetadataManager {
 
-  private final String _tableNameWithType;
-  private final int _partitionId;
-  private final List<String> _primaryKeyColumns;
-  private final String _comparisonColumn;
-  private final HashFunction _hashFunction;
-  private final PartialUpsertHandler _partialUpsertHandler;
-  private final ServerMetrics _serverMetrics;
-  private final Logger _logger;
-
-  // TODO(upsert): consider an off-heap KV store to persist this mapping to improve the recovery speed.
   @VisibleForTesting
   final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
-  @VisibleForTesting
-  final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
-
   // Reused for reading previous record during partial upsert
   private final GenericRow _reuse = new GenericRow();
 
-  private long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
-  private int _numOutOfOrderEvents = 0;
-
   public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
       List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
       @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) {
-    _tableNameWithType = tableNameWithType;
-    _partitionId = partitionId;
-    _primaryKeyColumns = primaryKeyColumns;
-    _comparisonColumn = comparisonColumn;
-    _hashFunction = hashFunction;
-    _partialUpsertHandler = partialUpsertHandler;
-    _serverMetrics = serverMetrics;
-    _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
+    super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumn, hashFunction, partialUpsertHandler,
+        serverMetrics);
   }
 
   @Override
-  public List<String> getPrimaryKeyColumns() {
-    return _primaryKeyColumns;
+  protected long getNumPrimaryKeys() {
+    return _primaryKeyColumns.size();
   }
 
   @Override
-  public void addSegment(ImmutableSegment segment) {
-    addSegment(segment, null, null);
-  }
-
-  @VisibleForTesting
-  void addSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
-      @Nullable Iterator<RecordInfo> recordInfoIterator) {
-    String segmentName = segment.getSegmentName();
-    _logger.info("Adding segment: {}, current primary key count: {}", segmentName,
-        _primaryKeyToRecordLocationMap.size());
-
-    if (segment instanceof EmptyIndexSegment) {
-      _logger.info("Skip adding empty segment: {}", segmentName);
-      return;
-    }
-
-    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
-    segmentLock.lock();
-    try {
-      Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
-          "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
-          _tableNameWithType);
-      if (validDocIds == null) {
-        validDocIds = new ThreadSafeMutableRoaringBitmap();
-      }
-      if (recordInfoIterator == null) {
-        recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn);
-      }
-      addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, null, null);
-    } finally {
-      segmentLock.unlock();
-    }
-
-    // Update metrics
-    int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        numPrimaryKeys);
-
-    _logger.info("Finished adding segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
-  }
-
-  private void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
+  protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
       Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
       @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
     String segmentName = segment.getSegmentName();
@@ -230,6 +157,24 @@ public class ConcurrentMapPartitionUpsertMetadataManager implements PartitionUps
     }
   }
 
+  @Override
+  protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
+    assert !validDocIds.isEmpty();
+    PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]);
+    PeekableIntIterator iterator = validDocIds.getIntIterator();
+    while (iterator.hasNext()) {
+      int docId = iterator.next();
+      UpsertUtils.getPrimaryKey(segment, _primaryKeyColumns, docId, primaryKey);
+      _primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey, _hashFunction),
+          (pk, recordLocation) -> {
+            if (recordLocation.getSegment() == segment) {
+              return null;
+            }
+            return recordLocation;
+          });
+    }
+  }
+
   @Override
   public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
     ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
@@ -265,129 +210,6 @@ public class ConcurrentMapPartitionUpsertMetadataManager implements PartitionUps
         _primaryKeyToRecordLocationMap.size());
   }
 
-  @Override
-  public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
-    replaceSegment(segment, null, null, oldSegment);
-  }
-
-  @VisibleForTesting
-  void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
-      @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
-    String segmentName = segment.getSegmentName();
-    Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()),
-        "Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}",
-        _tableNameWithType, oldSegment.getSegmentName(), segmentName);
-    _logger.info("Replacing {} segment: {}, current primary key count: {}",
-        oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName,
-        _primaryKeyToRecordLocationMap.size());
-
-    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
-    segmentLock.lock();
-    try {
-      MutableRoaringBitmap validDocIdsForOldSegment =
-          oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
-      if (segment instanceof EmptyIndexSegment) {
-        _logger.info("Skip adding empty segment: {}", segmentName);
-      } else {
-        Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
-            "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
-            _tableNameWithType);
-        if (validDocIds == null) {
-          validDocIds = new ThreadSafeMutableRoaringBitmap();
-        }
-        if (recordInfoIterator == null) {
-          recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn);
-        }
-        addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, oldSegment,
-            validDocIdsForOldSegment);
-      }
-
-      if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) {
-        int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
-        if (_partialUpsertHandler != null) {
-          // For partial-upsert table, because we do not restore the original record location when removing the primary
-          // keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a
-          // consuming segment is replaced by a committed segment that is consumed from a different server with
-          // different records (some stream consumer cannot guarantee consuming the messages in the same order).
-          _logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This "
-              + "can potentially cause inconsistency between replicas", numKeysNotReplaced, segmentName);
-          _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
-              numKeysNotReplaced);
-        } else {
-          _logger.info("Found {} primary keys not replaced when replacing segment: {}", numKeysNotReplaced,
-              segmentName);
-        }
-        removeSegment(oldSegment, validDocIdsForOldSegment);
-      }
-    } finally {
-      segmentLock.unlock();
-    }
-
-    if (!(oldSegment instanceof EmptyIndexSegment)) {
-      _replacedSegments.add(oldSegment);
-    }
-
-    // Update metrics
-    int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        numPrimaryKeys);
-
-    _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
-  }
-
-  @Override
-  public void removeSegment(IndexSegment segment) {
-    String segmentName = segment.getSegmentName();
-    _logger.info("Removing {} segment: {}, current primary key count: {}",
-        segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName,
-        _primaryKeyToRecordLocationMap.size());
-
-    if (_replacedSegments.remove(segment)) {
-      _logger.info("Skip removing replaced segment: {}", segmentName);
-      return;
-    }
-
-    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
-    segmentLock.lock();
-    try {
-      MutableRoaringBitmap validDocIds =
-          segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
-      if (validDocIds == null || validDocIds.isEmpty()) {
-        _logger.info("Skip removing segment without valid docs: {}", segmentName);
-        return;
-      }
-
-      _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
-      removeSegment(segment, validDocIds);
-    } finally {
-      segmentLock.unlock();
-    }
-
-    // Update metrics
-    int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        numPrimaryKeys);
-
-    _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
-  }
-
-  private void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
-    assert !validDocIds.isEmpty();
-    PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]);
-    PeekableIntIterator iterator = validDocIds.getIntIterator();
-    while (iterator.hasNext()) {
-      int docId = iterator.next();
-      UpsertUtils.getPrimaryKey(segment, _primaryKeyColumns, docId, primaryKey);
-      _primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey, _hashFunction),
-          (pk, recordLocation) -> {
-            if (recordLocation.getSegment() == segment) {
-              return null;
-            }
-            return recordLocation;
-          });
-    }
-  }
-
   @Override
   public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
     // Directly return the record when partial-upsert is not enabled
@@ -410,16 +232,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager implements PartitionUps
       if (previousRecord != null) {
         return _partialUpsertHandler.merge(previousRecord, record);
       } else {
-        _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
-        _numOutOfOrderEvents++;
-        long currentTimeNs = System.nanoTime();
-        if (currentTimeNs - _lastOutOfOrderEventReportTimeNs > OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
-          _logger.warn("Skipped {} out-of-order events for partial-upsert table (the last event has current comparison "
-                  + "value: {}, record comparison value: {})", _numOutOfOrderEvents,
-              currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
-          _lastOutOfOrderEventReportTimeNs = currentTimeNs;
-          _numOutOfOrderEvents = 0;
-        }
+        handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
         return record;
       }
     } else {
@@ -428,12 +241,6 @@ public class ConcurrentMapPartitionUpsertMetadataManager implements PartitionUps
     }
   }
 
-  @Override
-  public void close() {
-    _logger.info("Closing metadata manager for table {} and partition {}, current primary key count: {}",
-        _tableNameWithType, _partitionId, _primaryKeyToRecordLocationMap.size());
-  }
-
   @VisibleForTesting
   static class RecordLocation {
     private final IndexSegment _segment;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 5c5c357079..9c22316703 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.concurrent.ThreadSafe;
@@ -39,7 +40,8 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
   }
 
   @Override
-  public void close() {
+  public void close()
+      throws IOException {
     for (ConcurrentMapPartitionUpsertMetadataManager partitionUpsertMetadataManager
         : _partitionMetadataManagerMap.values()) {
       partitionUpsertMetadataManager.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org