You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/09/20 19:44:34 UTC
[pinot] branch master updated: Extract the common logic for upsert metadata manager (#9435)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ab2db0c693 Extract the common logic for upsert metadata manager (#9435)
ab2db0c693 is described below
commit ab2db0c69340214bbee4ca901616696ae396ed87
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Sep 20 12:44:27 2022 -0700
Extract the common logic for upsert metadata manager (#9435)
---
.../upsert/BasePartitionUpsertMetadataManager.java | 255 +++++++++++++++++++++
...oncurrentMapPartitionUpsertMetadataManager.java | 243 ++------------------
.../ConcurrentMapTableUpsertMetadataManager.java | 4 +-
3 files changed, 283 insertions(+), 219 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
new file mode 100644
index 0000000000..41f6fa9f3c
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@ThreadSafe
+public abstract class BasePartitionUpsertMetadataManager implements PartitionUpsertMetadataManager {
+ protected static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1);
+
+ protected final String _tableNameWithType;
+ protected final int _partitionId;
+ protected final List<String> _primaryKeyColumns;
+ protected final String _comparisonColumn;
+ protected final HashFunction _hashFunction;
+ protected final PartialUpsertHandler _partialUpsertHandler;
+ protected final ServerMetrics _serverMetrics;
+ protected final Logger _logger;
+
+ @VisibleForTesting
+ public final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
+
+ protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
+ protected int _numOutOfOrderEvents = 0;
+
+ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
+ List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
+ @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) {
+ _tableNameWithType = tableNameWithType;
+ _partitionId = partitionId;
+ _primaryKeyColumns = primaryKeyColumns;
+ _comparisonColumn = comparisonColumn;
+ _hashFunction = hashFunction;
+ _partialUpsertHandler = partialUpsertHandler;
+ _serverMetrics = serverMetrics;
+ _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
+ }
+
+ @Override
+ public List<String> getPrimaryKeyColumns() {
+ return _primaryKeyColumns;
+ }
+
+ @Override
+ public void addSegment(ImmutableSegment segment) {
+ addSegment(segment, null, null);
+ }
+
+ @VisibleForTesting
+ public void addSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable Iterator<RecordInfo> recordInfoIterator) {
+ String segmentName = segment.getSegmentName();
+ _logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys());
+
+ if (segment instanceof EmptyIndexSegment) {
+ _logger.info("Skip adding empty segment: {}", segmentName);
+ return;
+ }
+
+ Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+ segmentLock.lock();
+ try {
+ Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+ "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+ _tableNameWithType);
+ if (validDocIds == null) {
+ validDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
+ if (recordInfoIterator == null) {
+ recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn);
+ }
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, null, null);
+ } finally {
+ segmentLock.unlock();
+ }
+
+ // Update metrics
+ long numPrimaryKeys = getNumPrimaryKeys();
+ _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+ numPrimaryKeys);
+
+ _logger.info("Finished adding segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+ }
+
+ protected abstract long getNumPrimaryKeys();
+
+ protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
+ Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
+ @Nullable MutableRoaringBitmap validDocIdsForOldSegment);
+
+ @Override
+ public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
+ replaceSegment(segment, null, null, oldSegment);
+ }
+
+ @VisibleForTesting
+ public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
+ String segmentName = segment.getSegmentName();
+ Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()),
+ "Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}",
+ _tableNameWithType, oldSegment.getSegmentName(), segmentName);
+ _logger.info("Replacing {} segment: {}, current primary key count: {}",
+ oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
+
+ Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+ segmentLock.lock();
+ try {
+ MutableRoaringBitmap validDocIdsForOldSegment =
+ oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+ if (segment instanceof EmptyIndexSegment) {
+ _logger.info("Skip adding empty segment: {}", segmentName);
+ } else {
+ Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+ "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
+ _tableNameWithType);
+ if (validDocIds == null) {
+ validDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
+ if (recordInfoIterator == null) {
+ recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn);
+ }
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, oldSegment,
+ validDocIdsForOldSegment);
+ }
+
+ if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) {
+ int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+ if (_partialUpsertHandler != null) {
+ // For partial-upsert table, because we do not restore the original record location when removing the primary
+ // keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a
+ // consuming segment is replaced by a committed segment that is consumed from a different server with
+ // different records (some stream consumer cannot guarantee consuming the messages in the same order).
+ _logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This "
+ + "can potentially cause inconsistency between replicas", numKeysNotReplaced, segmentName);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
+ numKeysNotReplaced);
+ } else {
+ _logger.info("Found {} primary keys not replaced when replacing segment: {}", numKeysNotReplaced,
+ segmentName);
+ }
+ removeSegment(oldSegment, validDocIdsForOldSegment);
+ }
+ } finally {
+ segmentLock.unlock();
+ }
+
+ if (!(oldSegment instanceof EmptyIndexSegment)) {
+ _replacedSegments.add(oldSegment);
+ }
+
+ // Update metrics
+ long numPrimaryKeys = getNumPrimaryKeys();
+ _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+ numPrimaryKeys);
+
+ _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+ }
+
+ protected abstract void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds);
+
+ @Override
+ public void removeSegment(IndexSegment segment) {
+ String segmentName = segment.getSegmentName();
+ _logger.info("Removing {} segment: {}, current primary key count: {}",
+ segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys());
+
+ if (_replacedSegments.remove(segment)) {
+ _logger.info("Skip removing replaced segment: {}", segmentName);
+ return;
+ }
+
+ Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+ segmentLock.lock();
+ try {
+ MutableRoaringBitmap validDocIds =
+ segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
+ if (validDocIds == null || validDocIds.isEmpty()) {
+ _logger.info("Skip removing segment without valid docs: {}", segmentName);
+ return;
+ }
+
+ _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
+ removeSegment(segment, validDocIds);
+ } finally {
+ segmentLock.unlock();
+ }
+
+ // Update metrics
+ long numPrimaryKeys = getNumPrimaryKeys();
+ _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
+ numPrimaryKeys);
+
+ _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
+ }
+
+ protected void handleOutOfOrderEvent(Object currentComparisonValue, Object recordComparisonValue) {
+ _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
+ _numOutOfOrderEvents++;
+ long currentTimeNs = System.nanoTime();
+ if (currentTimeNs - _lastOutOfOrderEventReportTimeNs > OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
+ _logger.warn("Skipped {} out-of-order events for partial-upsert table (the last event has current comparison "
+ + "value: {}, record comparison value: {})", _numOutOfOrderEvents, currentComparisonValue,
+ recordComparisonValue);
+ _lastOutOfOrderEventReportTimeNs = currentTimeNs;
+ _numOutOfOrderEvents = 0;
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _logger.info("Closing the metadata manager, current primary key count: {}", getNumPrimaryKeys());
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 1654eaca0d..125f61fe47 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -19,27 +19,20 @@
package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.SegmentLocks;
-import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
@@ -48,8 +41,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -57,92 +48,28 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@ThreadSafe
-public class ConcurrentMapPartitionUpsertMetadataManager implements PartitionUpsertMetadataManager {
- private static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1);
+public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUpsertMetadataManager {
- private final String _tableNameWithType;
- private final int _partitionId;
- private final List<String> _primaryKeyColumns;
- private final String _comparisonColumn;
- private final HashFunction _hashFunction;
- private final PartialUpsertHandler _partialUpsertHandler;
- private final ServerMetrics _serverMetrics;
- private final Logger _logger;
-
- // TODO(upsert): consider an off-heap KV store to persist this mapping to improve the recovery speed.
@VisibleForTesting
final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
- @VisibleForTesting
- final Set<IndexSegment> _replacedSegments = ConcurrentHashMap.newKeySet();
-
// Reused for reading previous record during partial upsert
private final GenericRow _reuse = new GenericRow();
- private long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
- private int _numOutOfOrderEvents = 0;
-
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
@Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) {
- _tableNameWithType = tableNameWithType;
- _partitionId = partitionId;
- _primaryKeyColumns = primaryKeyColumns;
- _comparisonColumn = comparisonColumn;
- _hashFunction = hashFunction;
- _partialUpsertHandler = partialUpsertHandler;
- _serverMetrics = serverMetrics;
- _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
+ super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumn, hashFunction, partialUpsertHandler,
+ serverMetrics);
}
@Override
- public List<String> getPrimaryKeyColumns() {
- return _primaryKeyColumns;
+ protected long getNumPrimaryKeys() {
+ return _primaryKeyColumns.size();
}
@Override
- public void addSegment(ImmutableSegment segment) {
- addSegment(segment, null, null);
- }
-
- @VisibleForTesting
- void addSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
- @Nullable Iterator<RecordInfo> recordInfoIterator) {
- String segmentName = segment.getSegmentName();
- _logger.info("Adding segment: {}, current primary key count: {}", segmentName,
- _primaryKeyToRecordLocationMap.size());
-
- if (segment instanceof EmptyIndexSegment) {
- _logger.info("Skip adding empty segment: {}", segmentName);
- return;
- }
-
- Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
- segmentLock.lock();
- try {
- Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
- "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
- _tableNameWithType);
- if (validDocIds == null) {
- validDocIds = new ThreadSafeMutableRoaringBitmap();
- }
- if (recordInfoIterator == null) {
- recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn);
- }
- addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, null, null);
- } finally {
- segmentLock.unlock();
- }
-
- // Update metrics
- int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- numPrimaryKeys);
-
- _logger.info("Finished adding segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
- }
-
- private void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
+ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
@Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
String segmentName = segment.getSegmentName();
@@ -230,6 +157,24 @@ public class ConcurrentMapPartitionUpsertMetadataManager implements PartitionUps
}
}
+ @Override
+ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
+ assert !validDocIds.isEmpty();
+ PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]);
+ PeekableIntIterator iterator = validDocIds.getIntIterator();
+ while (iterator.hasNext()) {
+ int docId = iterator.next();
+ UpsertUtils.getPrimaryKey(segment, _primaryKeyColumns, docId, primaryKey);
+ _primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey, _hashFunction),
+ (pk, recordLocation) -> {
+ if (recordLocation.getSegment() == segment) {
+ return null;
+ }
+ return recordLocation;
+ });
+ }
+ }
+
@Override
public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
@@ -265,129 +210,6 @@ public class ConcurrentMapPartitionUpsertMetadataManager implements PartitionUps
_primaryKeyToRecordLocationMap.size());
}
- @Override
- public void replaceSegment(ImmutableSegment segment, IndexSegment oldSegment) {
- replaceSegment(segment, null, null, oldSegment);
- }
-
- @VisibleForTesting
- void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
- @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
- String segmentName = segment.getSegmentName();
- Preconditions.checkArgument(segmentName.equals(oldSegment.getSegmentName()),
- "Cannot replace segment with different name for table: {}, old segment: {}, new segment: {}",
- _tableNameWithType, oldSegment.getSegmentName(), segmentName);
- _logger.info("Replacing {} segment: {}, current primary key count: {}",
- oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName,
- _primaryKeyToRecordLocationMap.size());
-
- Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
- segmentLock.lock();
- try {
- MutableRoaringBitmap validDocIdsForOldSegment =
- oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
- if (segment instanceof EmptyIndexSegment) {
- _logger.info("Skip adding empty segment: {}", segmentName);
- } else {
- Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
- "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
- _tableNameWithType);
- if (validDocIds == null) {
- validDocIds = new ThreadSafeMutableRoaringBitmap();
- }
- if (recordInfoIterator == null) {
- recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn);
- }
- addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, oldSegment,
- validDocIdsForOldSegment);
- }
-
- if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) {
- int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
- if (_partialUpsertHandler != null) {
- // For partial-upsert table, because we do not restore the original record location when removing the primary
- // keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a
- // consuming segment is replaced by a committed segment that is consumed from a different server with
- // different records (some stream consumer cannot guarantee consuming the messages in the same order).
- _logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This "
- + "can potentially cause inconsistency between replicas", numKeysNotReplaced, segmentName);
- _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
- numKeysNotReplaced);
- } else {
- _logger.info("Found {} primary keys not replaced when replacing segment: {}", numKeysNotReplaced,
- segmentName);
- }
- removeSegment(oldSegment, validDocIdsForOldSegment);
- }
- } finally {
- segmentLock.unlock();
- }
-
- if (!(oldSegment instanceof EmptyIndexSegment)) {
- _replacedSegments.add(oldSegment);
- }
-
- // Update metrics
- int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- numPrimaryKeys);
-
- _logger.info("Finished replacing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
- }
-
- @Override
- public void removeSegment(IndexSegment segment) {
- String segmentName = segment.getSegmentName();
- _logger.info("Removing {} segment: {}, current primary key count: {}",
- segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName,
- _primaryKeyToRecordLocationMap.size());
-
- if (_replacedSegments.remove(segment)) {
- _logger.info("Skip removing replaced segment: {}", segmentName);
- return;
- }
-
- Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
- segmentLock.lock();
- try {
- MutableRoaringBitmap validDocIds =
- segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
- if (validDocIds == null || validDocIds.isEmpty()) {
- _logger.info("Skip removing segment without valid docs: {}", segmentName);
- return;
- }
-
- _logger.info("Removing {} primary keys for segment: {}", validDocIds.getCardinality(), segmentName);
- removeSegment(segment, validDocIds);
- } finally {
- segmentLock.unlock();
- }
-
- // Update metrics
- int numPrimaryKeys = _primaryKeyToRecordLocationMap.size();
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
- numPrimaryKeys);
-
- _logger.info("Finished removing segment: {}, current primary key count: {}", segmentName, numPrimaryKeys);
- }
-
- private void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
- assert !validDocIds.isEmpty();
- PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]);
- PeekableIntIterator iterator = validDocIds.getIntIterator();
- while (iterator.hasNext()) {
- int docId = iterator.next();
- UpsertUtils.getPrimaryKey(segment, _primaryKeyColumns, docId, primaryKey);
- _primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(primaryKey, _hashFunction),
- (pk, recordLocation) -> {
- if (recordLocation.getSegment() == segment) {
- return null;
- }
- return recordLocation;
- });
- }
- }
-
@Override
public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
// Directly return the record when partial-upsert is not enabled
@@ -410,16 +232,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager implements PartitionUps
if (previousRecord != null) {
return _partialUpsertHandler.merge(previousRecord, record);
} else {
- _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
- _numOutOfOrderEvents++;
- long currentTimeNs = System.nanoTime();
- if (currentTimeNs - _lastOutOfOrderEventReportTimeNs > OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
- _logger.warn("Skipped {} out-of-order events for partial-upsert table (the last event has current comparison "
- + "value: {}, record comparison value: {})", _numOutOfOrderEvents,
- currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
- _lastOutOfOrderEventReportTimeNs = currentTimeNs;
- _numOutOfOrderEvents = 0;
- }
+ handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
return record;
}
} else {
@@ -428,12 +241,6 @@ public class ConcurrentMapPartitionUpsertMetadataManager implements PartitionUps
}
}
- @Override
- public void close() {
- _logger.info("Closing metadata manager for table {} and partition {}, current primary key count: {}",
- _tableNameWithType, _partitionId, _primaryKeyToRecordLocationMap.size());
- }
-
@VisibleForTesting
static class RecordLocation {
private final IndexSegment _segment;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 5c5c357079..9c22316703 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.upsert;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
@@ -39,7 +40,8 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
}
@Override
- public void close() {
+ public void close()
+ throws IOException {
for (ConcurrentMapPartitionUpsertMetadataManager partitionUpsertMetadataManager
: _partitionMetadataManagerMap.values()) {
partitionUpsertMetadataManager.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org