You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/06/19 19:10:28 UTC
[pinot] branch master updated: For dedup/partial-upsert, check all segments loaded when creating the consuming segment (#8923)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 517b8813b8 For dedup/partial-upsert, check all segments loaded when creating the consuming segment (#8923)
517b8813b8 is described below
commit 517b8813b8029bfcb75f90db043817256efce997
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Jun 19 12:10:23 2022 -0700
For dedup/partial-upsert, check all segments loaded when creating the consuming segment (#8923)
---
.../manager/realtime/RealtimeTableDataManager.java | 181 +++++++++++----------
.../local/dedup/PartitionDedupMetadataManager.java | 73 +++------
.../local/dedup/TableDedupMetadataManager.java | 9 +-
.../segment/local/upsert/PartialUpsertHandler.java | 29 +---
.../upsert/PartitionUpsertMetadataManager.java | 13 +-
.../local/upsert/TableUpsertMetadataManager.java | 4 +
.../local/utils/tablestate/TableStateUtils.java | 18 +-
.../dedup/PartitionDedupMetadataManagerTest.java | 109 +++++--------
.../mutable/MutableSegmentDedupeTest.java | 19 +--
.../local/upsert/PartialUpsertHandlerTest.java | 16 +-
10 files changed, 186 insertions(+), 285 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 36740212fc..195e5e023e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.data.manager.realtime;
import com.google.common.base.Preconditions;
import java.io.File;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
@@ -33,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
@@ -64,10 +64,10 @@ import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.utils.RecordInfo;
import org.apache.pinot.segment.local.utils.SchemaUtils;
+import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.DedupConfig;
-import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -115,9 +115,10 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
// likely that we get fresh data each time instead of multiple copies of roughly same data.
private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;
- private UpsertConfig.Mode _upsertMode;
- private TableUpsertMetadataManager _tableUpsertMetadataManager;
+ private final AtomicBoolean _allSegmentsLoaded = new AtomicBoolean();
+
private TableDedupMetadataManager _tableDedupMetadataManager;
+ private TableUpsertMetadataManager _tableUpsertMetadataManager;
private List<String> _primaryKeyColumns;
private String _upsertComparisonColumn;
@@ -133,9 +134,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
try {
_statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile);
} catch (IOException | ClassNotFoundException e) {
- _logger
- .error("Error reading history object for table {} from {}", _tableNameWithType, statsFile.getAbsolutePath(),
- e);
+ _logger.error("Error reading history object for table {} from {}", _tableNameWithType,
+ statsFile.getAbsolutePath(), e);
File savedFile = new File(_tableDataDir, STATS_FILE_NAME + "." + UUID.randomUUID());
try {
FileUtils.moveFile(statsFile, savedFile);
@@ -156,64 +156,62 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
String consumerDirPath = getConsumerDir();
File consumerDir = new File(consumerDirPath);
+ if (consumerDir.exists()) {
+ File[] segmentFiles = consumerDir.listFiles((dir, name) -> !name.equals(STATS_FILE_NAME));
+ Preconditions.checkState(segmentFiles != null, "Failed to list segment files from consumer dir: {} for table: {}",
+ consumerDirPath, _tableNameWithType);
+ for (File file : segmentFiles) {
+ if (file.delete()) {
+ _logger.info("Deleted old file {}", file.getAbsolutePath());
+ } else {
+ _logger.error("Cannot delete file {}", file.getAbsolutePath());
+ }
+ }
+ }
- // NOTE: Upsert has to be set up when starting the server. Changing the table config without restarting the server
- // won't enable/disable the upsert on the fly.
+ // Set up dedup/upsert metadata manager
+ // NOTE: Dedup/upsert has to be set up when starting the server. Changing the table config without restarting the
+ // server won't enable/disable them on the fly.
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType);
- _upsertMode = tableConfig.getUpsertMode();
- if (tableConfig.getDedupConfig() != null && tableConfig.getDedupConfig().isDedupEnabled()) {
+
+ DedupConfig dedupConfig = tableConfig.getDedupConfig();
+ boolean dedupEnabled = dedupConfig != null && dedupConfig.isDedupEnabled();
+ if (dedupEnabled) {
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType);
+
_primaryKeyColumns = schema.getPrimaryKeyColumns();
- DedupConfig dedupConfig = tableConfig.getDedupConfig();
- HashFunction dedupHashFunction = dedupConfig.getHashFunction();
- _tableDedupMetadataManager =
- new TableDedupMetadataManager(_helixManager, _tableNameWithType, _primaryKeyColumns, _serverMetrics,
- dedupHashFunction);
+ Preconditions.checkState(!CollectionUtils.isEmpty(_primaryKeyColumns),
+ "Primary key columns must be configured for dedup");
+ _tableDedupMetadataManager = new TableDedupMetadataManager(_tableNameWithType, _primaryKeyColumns, _serverMetrics,
+ dedupConfig.getHashFunction());
}
- if (isUpsertEnabled()) {
- UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
- assert upsertConfig != null;
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ if (upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE) {
+ Preconditions.checkState(!dedupEnabled, "Dedup and upsert cannot be both enabled for table: %s",
+ _tableUpsertMetadataManager);
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType);
- PartialUpsertHandler partialUpsertHandler = null;
- if (isPartialUpsertEnabled()) {
- String comparisonColumn = upsertConfig.getComparisonColumn();
- if (comparisonColumn == null) {
- comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName();
- }
- partialUpsertHandler = new PartialUpsertHandler(_helixManager, _tableNameWithType, schema,
- upsertConfig.getPartialUpsertStrategies(), upsertConfig.getDefaultPartialUpsertStrategy(),
- comparisonColumn);
- }
- HashFunction hashFunction = upsertConfig.getHashFunction();
- _tableUpsertMetadataManager =
- new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, partialUpsertHandler, hashFunction);
_primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(!CollectionUtils.isEmpty(_primaryKeyColumns),
"Primary key columns must be configured for upsert");
String comparisonColumn = upsertConfig.getComparisonColumn();
_upsertComparisonColumn =
comparisonColumn != null ? comparisonColumn : tableConfig.getValidationConfig().getTimeColumnName();
- }
- if (consumerDir.exists()) {
- File[] segmentFiles = consumerDir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return !name.equals(STATS_FILE_NAME);
- }
- });
- for (File file : segmentFiles) {
- if (file.delete()) {
- _logger.info("Deleted old file {}", file.getAbsolutePath());
- } else {
- _logger.error("Cannot delete file {}", file.getAbsolutePath());
- }
+ PartialUpsertHandler partialUpsertHandler = null;
+ if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
+ assert upsertConfig.getPartialUpsertStrategies() != null;
+ partialUpsertHandler = new PartialUpsertHandler(schema, upsertConfig.getPartialUpsertStrategies(),
+ upsertConfig.getDefaultPartialUpsertStrategy(), _upsertComparisonColumn);
}
+
+ _tableUpsertMetadataManager =
+ new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, partialUpsertHandler,
+ upsertConfig.getHashFunction());
}
}
@@ -266,11 +264,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
}
public boolean isUpsertEnabled() {
- return _upsertMode != UpsertConfig.Mode.NONE;
+ return _tableUpsertMetadataManager != null;
}
public boolean isPartialUpsertEnabled() {
- return _upsertMode == UpsertConfig.Mode.PARTIAL;
+ return _tableUpsertMetadataManager != null && _tableUpsertMetadataManager.isPartialUpsertEnabled();
}
/*
@@ -358,8 +356,19 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
_tableUpsertMetadataManager != null ? _tableUpsertMetadataManager.getOrCreatePartitionManager(
partitionGroupId) : null;
PartitionDedupMetadataManager partitionDedupMetadataManager =
- _tableDedupMetadataManager != null ? _tableDedupMetadataManager
- .getOrCreatePartitionManager(partitionGroupId) : null;
+ _tableDedupMetadataManager != null ? _tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
+ : null;
+ // For dedup and partial-upsert, wait for all segments loaded before creating the consuming segment
+ if (isDedupEnabled() || isPartialUpsertEnabled()) {
+ if (!_allSegmentsLoaded.get()) {
+ synchronized (_allSegmentsLoaded) {
+ if (!_allSegmentsLoaded.get()) {
+ TableStateUtils.waitForAllSegmentsLoaded(_helixManager, _tableNameWithType);
+ _allSegmentsLoaded.set(true);
+ }
+ }
+ }
+ }
segmentDataManager =
new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager,
@@ -390,10 +399,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
private void buildDedupMeta(ImmutableSegmentImpl immutableSegment) {
// TODO(saurabh) refactor commons code with handleUpsert
String segmentName = immutableSegment.getSegmentName();
- Integer partitionGroupId = SegmentUtils
- .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0));
- Preconditions.checkNotNull(partitionGroupId, String
- .format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName,
+ Integer partitionGroupId =
+ SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager,
+ _primaryKeyColumns.get(0));
+ Preconditions.checkNotNull(partitionGroupId,
+ String.format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName,
_tableNameWithType));
PartitionDedupMetadataManager partitionDedupMetadataManager =
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId);
@@ -403,10 +413,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
private void handleUpsert(ImmutableSegmentImpl immutableSegment) {
String segmentName = immutableSegment.getSegmentName();
- Integer partitionGroupId = SegmentUtils
- .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0));
- Preconditions.checkNotNull(partitionGroupId, String
- .format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName,
+ Integer partitionGroupId =
+ SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager,
+ _primaryKeyColumns.get(0));
+ Preconditions.checkNotNull(partitionGroupId,
+ String.format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName,
_tableNameWithType));
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId);
@@ -417,37 +428,35 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
for (String primaryKeyColumn : _primaryKeyColumns) {
columnToReaderMap.put(primaryKeyColumn, new PinotSegmentColumnReader(immutableSegment, primaryKeyColumn));
}
- columnToReaderMap
- .put(_upsertComparisonColumn, new PinotSegmentColumnReader(immutableSegment, _upsertComparisonColumn));
+ columnToReaderMap.put(_upsertComparisonColumn,
+ new PinotSegmentColumnReader(immutableSegment, _upsertComparisonColumn));
int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs();
int numPrimaryKeyColumns = _primaryKeyColumns.size();
- Iterator<RecordInfo> recordInfoIterator =
- new Iterator<RecordInfo>() {
- private int _docId = 0;
+ Iterator<RecordInfo> recordInfoIterator = new Iterator<RecordInfo>() {
+ private int _docId = 0;
- @Override
- public boolean hasNext() {
- return _docId < numTotalDocs;
- }
+ @Override
+ public boolean hasNext() {
+ return _docId < numTotalDocs;
+ }
- @Override
- public RecordInfo next() {
- Object[] values = new Object[numPrimaryKeyColumns];
- for (int i = 0; i < numPrimaryKeyColumns; i++) {
- Object value = columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId);
- if (value instanceof byte[]) {
- value = new ByteArray((byte[]) value);
- }
- values[i] = value;
- }
- PrimaryKey primaryKey = new PrimaryKey(values);
- Object upsertComparisonValue = columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId);
- Preconditions.checkState(upsertComparisonValue instanceof Comparable,
- "Upsert comparison column: %s must be comparable", _upsertComparisonColumn);
- return new RecordInfo(primaryKey, _docId++,
- (Comparable) upsertComparisonValue);
+ @Override
+ public RecordInfo next() {
+ Object[] values = new Object[numPrimaryKeyColumns];
+ for (int i = 0; i < numPrimaryKeyColumns; i++) {
+ Object value = columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId);
+ if (value instanceof byte[]) {
+ value = new ByteArray((byte[]) value);
}
- };
+ values[i] = value;
+ }
+ PrimaryKey primaryKey = new PrimaryKey(values);
+ Object upsertComparisonValue = columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId);
+ Preconditions.checkState(upsertComparisonValue instanceof Comparable,
+ "Upsert comparison column: %s must be comparable", _upsertComparisonColumn);
+ return new RecordInfo(primaryKey, _docId++, (Comparable) upsertComparisonValue);
+ }
+ };
partitionUpsertMetadataManager.addSegment(immutableSegment, recordInfoIterator);
}
@@ -526,8 +535,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
return
CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())
- || CommonConstants.HTTPS_PROTOCOL
- .equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+ || CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(
+ tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
}
private void downloadSegmentFromPeer(String segmentName, String downloadScheme,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
index 353d33c951..258aad7a5d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
@@ -24,25 +24,17 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.utils.HashUtils;
-import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.ByteArray;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class PartitionDedupMetadataManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(PartitionDedupMetadataManager.class);
- private static boolean _allSegmentsLoaded;
-
- private final HelixManager _helixManager;
private final String _tableNameWithType;
private final List<String> _primaryKeyColumns;
private final int _partitionId;
@@ -52,9 +44,8 @@ public class PartitionDedupMetadataManager {
@VisibleForTesting
final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>();
- public PartitionDedupMetadataManager(HelixManager helixManager, String tableNameWithType,
- List<String> primaryKeyColumns, int partitionId, ServerMetrics serverMetrics, HashFunction hashFunction) {
- _helixManager = helixManager;
+ public PartitionDedupMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, int partitionId,
+ ServerMetrics serverMetrics, HashFunction hashFunction) {
_tableNameWithType = tableNameWithType;
_primaryKeyColumns = primaryKeyColumns;
_partitionId = partitionId;
@@ -64,7 +55,7 @@ public class PartitionDedupMetadataManager {
public void addSegment(IndexSegment segment) {
// Add all PKs to _primaryKeyToSegmentMap
- Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment, _primaryKeyColumns);
+ Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
while (primaryKeyIterator.hasNext()) {
PrimaryKey pk = primaryKeyIterator.next();
_primaryKeyToSegmentMap.put(HashUtils.hashPrimaryKey(pk, _hashFunction), segment);
@@ -75,30 +66,29 @@ public class PartitionDedupMetadataManager {
public void removeSegment(IndexSegment segment) {
// TODO(saurabh): Explain reload scenario here
- Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment, _primaryKeyColumns);
+ Iterator<PrimaryKey> primaryKeyIterator = getPrimaryKeyIterator(segment);
while (primaryKeyIterator.hasNext()) {
PrimaryKey pk = primaryKeyIterator.next();
- _primaryKeyToSegmentMap.compute(HashUtils.hashPrimaryKey(pk, _hashFunction),
- (primaryKey, currentSegment) -> {
- if (currentSegment == segment) {
- return null;
- } else {
- return currentSegment;
- }
- });
+ _primaryKeyToSegmentMap.compute(HashUtils.hashPrimaryKey(pk, _hashFunction), (primaryKey, currentSegment) -> {
+ if (currentSegment == segment) {
+ return null;
+ } else {
+ return currentSegment;
+ }
+ });
}
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
_primaryKeyToSegmentMap.size());
}
@VisibleForTesting
- public static Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment, List<String> primaryKeyColumns) {
+ Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment) {
Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>();
- for (String primaryKeyColumn : primaryKeyColumns) {
+ for (String primaryKeyColumn : _primaryKeyColumns) {
columnToReaderMap.put(primaryKeyColumn, new PinotSegmentColumnReader(segment, primaryKeyColumn));
}
int numTotalDocs = segment.getSegmentMetadata().getTotalDocs();
- int numPrimaryKeyColumns = primaryKeyColumns.size();
+ int numPrimaryKeyColumns = _primaryKeyColumns.size();
return new Iterator<PrimaryKey>() {
private int _docId = 0;
@@ -111,7 +101,7 @@ public class PartitionDedupMetadataManager {
public PrimaryKey next() {
Object[] values = new Object[numPrimaryKeyColumns];
for (int i = 0; i < numPrimaryKeyColumns; i++) {
- Object value = columnToReaderMap.get(primaryKeyColumns.get(i)).getValue(_docId);
+ Object value = columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId);
if (value instanceof byte[]) {
value = new ByteArray((byte[]) value);
}
@@ -123,34 +113,13 @@ public class PartitionDedupMetadataManager {
};
}
- private synchronized void waitTillAllSegmentsLoaded() {
- if (_allSegmentsLoaded) {
- return;
- }
-
- while (!TableStateUtils.isAllSegmentsLoaded(_helixManager, _tableNameWithType)) {
- LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType);
- try {
- //noinspection BusyWait
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- _allSegmentsLoaded = true;
- }
-
public boolean checkRecordPresentOrUpdate(PrimaryKey pk, IndexSegment indexSegment) {
- if (!_allSegmentsLoaded) {
- waitTillAllSegmentsLoaded();
+ boolean present =
+ _primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, _hashFunction), indexSegment) != null;
+ if (!present) {
+ _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
+ _primaryKeyToSegmentMap.size());
}
-
- boolean result =
- (_primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, _hashFunction), indexSegment)
- != null);
- _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
- _primaryKeyToSegmentMap.size());
-
- return result;
+ return present;
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
index 6e25974970..dedcebfff0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
@@ -21,22 +21,19 @@ package org.apache.pinot.segment.local.dedup;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.spi.config.table.HashFunction;
public class TableDedupMetadataManager {
private final Map<Integer, PartitionDedupMetadataManager> _partitionMetadataManagerMap = new ConcurrentHashMap<>();
- private final HelixManager _helixManager;
private final String _tableNameWithType;
private final List<String> _primaryKeyColumns;
private final ServerMetrics _serverMetrics;
private final HashFunction _hashFunction;
- public TableDedupMetadataManager(HelixManager helixManager, String tableNameWithType, List<String> primaryKeyColumns,
+ public TableDedupMetadataManager(String tableNameWithType, List<String> primaryKeyColumns,
ServerMetrics serverMetrics, HashFunction hashFunction) {
- _helixManager = helixManager;
_tableNameWithType = tableNameWithType;
_primaryKeyColumns = primaryKeyColumns;
_serverMetrics = serverMetrics;
@@ -45,7 +42,7 @@ public class TableDedupMetadataManager {
public PartitionDedupMetadataManager getOrCreatePartitionManager(int partitionId) {
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
- k -> new PartitionDedupMetadataManager(_helixManager, _tableNameWithType, _primaryKeyColumns, k,
- _serverMetrics, _hashFunction));
+ k -> new PartitionDedupMetadataManager(_tableNameWithType, _primaryKeyColumns, k, _serverMetrics,
+ _hashFunction));
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 720dc8a181..4a1cfad39f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -20,29 +20,22 @@ package org.apache.pinot.segment.local.upsert;
import java.util.HashMap;
import java.util.Map;
-import org.apache.helix.HelixManager;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
-import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
+
/**
* Handler for partial-upsert.
*/
public class PartialUpsertHandler {
// _column2Mergers maintains the mapping of merge strategies per columns.
- private final HelixManager _helixManager;
- private final String _tableNameWithType;
private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
- private boolean _allSegmentsLoaded;
- public PartialUpsertHandler(HelixManager helixManager, String tableNameWithType, Schema schema,
- Map<String, UpsertConfig.Strategy> partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy,
- String comparisonColumn) {
- _helixManager = helixManager;
- _tableNameWithType = tableNameWithType;
+ public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
+ UpsertConfig.Strategy defaultPartialUpsertStrategy, String comparisonColumn) {
for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
_column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
}
@@ -56,18 +49,6 @@ public class PartialUpsertHandler {
}
}
- /**
- * Returns {@code true} if all segments assigned to the current instance are loaded, {@code false} otherwise.
- * Consuming segment should perform this check to ensure all previous records are loaded before inserting new records.
- */
- public synchronized boolean isAllSegmentsLoaded() {
- if (_allSegmentsLoaded) {
- return true;
- }
- _allSegmentsLoaded = TableStateUtils.isAllSegmentsLoaded(_helixManager, _tableNameWithType);
- return _allSegmentsLoaded;
- }
-
/**
* Merges 2 records and returns the merged record.
* We used a map to indicate all configured fields for partial upsert. For these fields
@@ -91,8 +72,8 @@ public class PartialUpsertHandler {
newRecord.putValue(column, previousRecord.getValue(column));
newRecord.removeNullValueField(column);
} else {
- newRecord
- .putValue(column, entry.getValue().merge(previousRecord.getValue(column), newRecord.getValue(column)));
+ newRecord.putValue(column,
+ entry.getValue().merge(previousRecord.getValue(column), newRecord.getValue(column)));
}
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 02289cd1c1..041a86443a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -61,7 +61,7 @@ import org.slf4j.LoggerFactory;
* </li>
* </ul>
*/
-@SuppressWarnings({"rawtypes", "unchecked"})
+@SuppressWarnings("unchecked")
@ThreadSafe
public class PartitionUpsertMetadataManager {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
@@ -204,17 +204,6 @@ public class PartitionUpsertMetadataManager {
return record;
}
- // Ensure all previous records are loaded before inserting new records
- while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
- LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType);
- try {
- //noinspection BusyWait
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
RecordLocation currentRecordLocation =
_primaryKeyToRecordLocationMap.get(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction));
if (currentRecordLocation != null) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index 6ccd9315a6..384268a494 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -50,4 +50,8 @@ public class TableUpsertMetadataManager {
k -> new PartitionUpsertMetadataManager(_tableNameWithType, k, _serverMetrics, _partialUpsertHandler,
_hashFunction));
}
+
+ public boolean isPartialUpsertEnabled() {
+ return _partialUpsertHandler != null;
+ }
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index b83ac7d51d..2392a0bcd1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -29,6 +29,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class TableStateUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TableStateUtils.class);
@@ -72,8 +73,8 @@ public class TableStateUtils {
String actualState = currentStateMap.get(segmentName);
if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(actualState)) {
if (CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(actualState)) {
- LOGGER
- .error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, tableNameWithType, expectedState);
+ LOGGER.error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, tableNameWithType,
+ expectedState);
} else {
LOGGER.info("Find unloaded segment: {}, table: {}, expected: {}, actual: {}", segmentName, tableNameWithType,
expectedState, actualState);
@@ -85,4 +86,17 @@ public class TableStateUtils {
LOGGER.info("All segments loaded for table: {}", tableNameWithType);
return true;
}
+
+ public static void waitForAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType) {
+ try {
+ while (!TableStateUtils.isAllSegmentsLoaded(helixManager, tableNameWithType)) {
+ LOGGER.info("Sleeping 1 second waiting for all segments loaded for table: {}", tableNameWithType);
+ //noinspection BusyWait
+ Thread.sleep(1000L);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Caught exception while waiting for all segments loaded for table: " + tableNameWithType, e);
+ }
+ }
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
index 124aad1c63..39f7d6ae98 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
@@ -19,27 +19,22 @@
package org.apache.pinot.segment.local.dedup;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.local.utils.RecordInfo;
-import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.mockito.MockedStatic;
import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;
@@ -49,19 +44,12 @@ public class PartitionDedupMetadataManagerTest {
private static final String RAW_TABLE_NAME = "testTable";
private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
- @BeforeClass
- public void init() {
- MockedStatic mocked = mockStatic(TableStateUtils.class);
- mocked.when(() -> TableStateUtils.isAllSegmentsLoaded(any(), any())).thenReturn(true);
- }
-
@Test
public void verifyAddRemoveSegment() {
HashFunction hashFunction = HashFunction.NONE;
- PartitionDedupMetadataManager partitionDedupMetadataManager =
- new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0,
- mock(ServerMetrics.class), hashFunction);
- Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap;
+ TestMetadataManager metadataManager =
+ new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, mock(ServerMetrics.class), hashFunction);
+ Map<Object, IndexSegment> recordLocationMap = metadataManager._primaryKeyToSegmentMap;
// Add the first segment
List<PrimaryKey> pkList1 = new ArrayList<>();
@@ -71,41 +59,24 @@ public class PartitionDedupMetadataManagerTest {
pkList1.add(getPrimaryKey(0));
pkList1.add(getPrimaryKey(1));
pkList1.add(getPrimaryKey(0));
+ metadataManager._primaryKeyIterator = pkList1.iterator();
ImmutableSegmentImpl segment1 = mockSegment(1);
- MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class);
- mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
- .thenReturn(pkList1.iterator());
-
- partitionDedupMetadataManager.addSegment(segment1);
+ metadataManager.addSegment(segment1);
checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
checkRecordLocation(recordLocationMap, 1, segment1, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment1, hashFunction);
- pkList1 = new ArrayList<>();
- pkList1.add(getPrimaryKey(0));
- pkList1.add(getPrimaryKey(1));
- pkList1.add(getPrimaryKey(2));
- pkList1.add(getPrimaryKey(0));
- pkList1.add(getPrimaryKey(1));
- pkList1.add(getPrimaryKey(0));
-
- mocked.close();
- mocked = mockStatic(PartitionDedupMetadataManager.class);
- mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
- .thenReturn(pkList1.iterator());
-
- partitionDedupMetadataManager.removeSegment(segment1);
+ metadataManager._primaryKeyIterator = pkList1.iterator();
+ metadataManager.removeSegment(segment1);
Assert.assertEquals(recordLocationMap.size(), 0);
- mocked.close();
}
@Test
public void verifyReloadSegment() {
HashFunction hashFunction = HashFunction.NONE;
- PartitionDedupMetadataManager partitionDedupMetadataManager =
- new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0,
- mock(ServerMetrics.class), hashFunction);
- Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap;
+ TestMetadataManager metadataManager =
+ new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, mock(ServerMetrics.class), hashFunction);
+ Map<Object, IndexSegment> recordLocationMap = metadataManager._primaryKeyToSegmentMap;
// Add the first segment
List<PrimaryKey> pkList1 = new ArrayList<>();
@@ -115,45 +86,28 @@ public class PartitionDedupMetadataManagerTest {
pkList1.add(getPrimaryKey(0));
pkList1.add(getPrimaryKey(1));
pkList1.add(getPrimaryKey(0));
+ metadataManager._primaryKeyIterator = pkList1.iterator();
ImmutableSegmentImpl segment1 = mockSegment(1);
- MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class);
- mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
- .thenReturn(pkList1.iterator());
-
- partitionDedupMetadataManager.addSegment(segment1);
+ metadataManager.addSegment(segment1);
// Remove another segment with same PK rows
- pkList1 = new ArrayList<>();
- pkList1.add(getPrimaryKey(0));
- pkList1.add(getPrimaryKey(1));
- pkList1.add(getPrimaryKey(2));
- pkList1.add(getPrimaryKey(0));
- pkList1.add(getPrimaryKey(1));
- pkList1.add(getPrimaryKey(0));
+ metadataManager._primaryKeyIterator = pkList1.iterator();
ImmutableSegmentImpl segment2 = mockSegment(1);
-
- mocked.close();
- mocked = mockStatic(PartitionDedupMetadataManager.class);
- mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
- .thenReturn(pkList1.iterator());
-
- partitionDedupMetadataManager.removeSegment(segment2);
+ metadataManager.removeSegment(segment2);
Assert.assertEquals(recordLocationMap.size(), 3);
// Keys should still exist
checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
checkRecordLocation(recordLocationMap, 1, segment1, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment1, hashFunction);
- mocked.close();
}
@Test
public void verifyAddRow() {
HashFunction hashFunction = HashFunction.NONE;
- PartitionDedupMetadataManager partitionDedupMetadataManager =
- new PartitionDedupMetadataManager(mock(HelixManager.class), REALTIME_TABLE_NAME, null, 0,
- mock(ServerMetrics.class), hashFunction);
- Map<Object, IndexSegment> recordLocationMap = partitionDedupMetadataManager._primaryKeyToSegmentMap;
+ TestMetadataManager metadataManager =
+ new TestMetadataManager(REALTIME_TABLE_NAME, null, 0, mock(ServerMetrics.class), hashFunction);
+ Map<Object, IndexSegment> recordLocationMap = metadataManager._primaryKeyToSegmentMap;
// Add the first segment
List<PrimaryKey> pkList1 = new ArrayList<>();
@@ -163,28 +117,25 @@ public class PartitionDedupMetadataManagerTest {
pkList1.add(getPrimaryKey(0));
pkList1.add(getPrimaryKey(1));
pkList1.add(getPrimaryKey(0));
+ metadataManager._primaryKeyIterator = pkList1.iterator();
ImmutableSegmentImpl segment1 = mockSegment(1);
- MockedStatic mocked = mockStatic(PartitionDedupMetadataManager.class);
- mocked.when(() -> PartitionDedupMetadataManager.getPrimaryKeyIterator(any(), any()))
- .thenReturn(pkList1.iterator());
- partitionDedupMetadataManager.addSegment(segment1);
- mocked.close();
+ metadataManager.addSegment(segment1);
// Same PK exists
RecordInfo recordInfo = mock(RecordInfo.class);
when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(0));
ImmutableSegmentImpl segment2 = mockSegment(2);
- Assert.assertTrue(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+ Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
// New PK
when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3));
- Assert.assertFalse(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+ Assert.assertFalse(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
checkRecordLocation(recordLocationMap, 3, segment2, hashFunction);
// Same PK as the one recently ingested
when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3));
- Assert.assertTrue(partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+ Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
}
private static ImmutableSegmentImpl mockSegment(int sequenceNumber) {
@@ -208,4 +159,18 @@ public class PartitionDedupMetadataManagerTest {
assertNotNull(indexSegment);
assertSame(indexSegment, segment);
}
+
+ private static class TestMetadataManager extends PartitionDedupMetadataManager {
+ Iterator<PrimaryKey> _primaryKeyIterator;
+
+ TestMetadataManager(String tableNameWithType, List<String> primaryKeyColumns, int partitionId,
+ ServerMetrics serverMetrics, HashFunction hashFunction) {
+ super(tableNameWithType, primaryKeyColumns, partitionId, serverMetrics, hashFunction);
+ }
+
+ @Override
+ Iterator<PrimaryKey> getPrimaryKeyIterator(IndexSegment segment) {
+ return _primaryKeyIterator;
+ }
+ }
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
index 7acfbdccdc..6f310c863c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
@@ -22,12 +22,10 @@ package org.apache.pinot.segment.local.indexsegment.mutable;
import java.io.File;
import java.net.URL;
import java.util.Collections;
-import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
-import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -38,28 +36,16 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mockStatic;
-
public class MutableSegmentDedupeTest {
-
private static final String SCHEMA_FILE_PATH = "data/test_dedup_schema.json";
private static final String DATA_FILE_PATH = "data/test_dedup_data.json";
private MutableSegmentImpl _mutableSegmentImpl;
- @BeforeClass
- public void init() {
- MockedStatic mocked = mockStatic(TableStateUtils.class);
- mocked.when(() -> TableStateUtils.isAllSegmentsLoaded(any(), any())).thenReturn(true);
- }
-
private void setup(boolean dedupEnabled)
throws Exception {
URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
@@ -70,9 +56,8 @@ public class MutableSegmentDedupeTest {
CompositeTransformer recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema);
File jsonFile = new File(dataResourceUrl.getFile());
PartitionDedupMetadataManager partitionDedupMetadataManager =
- (dedupEnabled) ? new TableDedupMetadataManager(Mockito.mock(HelixManager.class), "testTable_REALTIME",
- schema.getPrimaryKeyColumns(), Mockito.mock(ServerMetrics.class),
- HashFunction.NONE).getOrCreatePartitionManager(0) : null;
+ (dedupEnabled) ? new TableDedupMetadataManager("testTable_REALTIME", schema.getPrimaryKeyColumns(),
+ Mockito.mock(ServerMetrics.class), HashFunction.NONE).getOrCreatePartitionManager(0) : null;
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), false, true, null, "secondsSinceEpoch", null, partitionDedupMetadataManager);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index 471a46c803..31a508e988 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -21,12 +21,10 @@ package org.apache.pinot.segment.local.upsert;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import org.apache.helix.HelixManager;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.mockito.Mockito;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -38,19 +36,14 @@ public class PartialUpsertHandlerTest {
@Test
public void testMerge() {
- HelixManager helixManager = Mockito.mock(HelixManager.class);
-
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING)
.addSingleValueDimension("field1", FieldSpec.DataType.LONG)
.addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS")
.setPrimaryKeyColumns(Arrays.asList("pk")).build();
-
- String realtimeTableName = "testTable_REALTIME";
Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
PartialUpsertHandler handler =
- new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies,
- UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
+ new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
// both records are null.
GenericRow previousRecord = new GenericRow();
@@ -97,19 +90,14 @@ public class PartialUpsertHandlerTest {
@Test
public void testMergeWithDefaultPartialUpsertStrategy() {
- HelixManager helixManager = Mockito.mock(HelixManager.class);
-
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING)
.addSingleValueDimension("field1", FieldSpec.DataType.LONG).addMetric("field2", FieldSpec.DataType.LONG)
.addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS")
.setPrimaryKeyColumns(Arrays.asList("pk")).build();
-
- String realtimeTableName = "testTable_REALTIME";
Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
PartialUpsertHandler handler =
- new PartialUpsertHandler(helixManager, realtimeTableName, schema, partialUpsertStrategies,
- UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
+ new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
// previousRecord is null default value, while newRecord is not.
GenericRow previousRecord = new GenericRow();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org