You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/12 00:39:46 UTC
(pinot) branch master updated: Introduce UpsertContext to simplify the upsert metadata manager constructor (#12120)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 92e26a5847 Introduce UpsertContext to simplify the upsert metadata manager constructor (#12120)
92e26a5847 is described below
commit 92e26a584723e02927fb25819c07174f2feae594
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Dec 11 16:39:40 2023 -0800
Introduce UpsertContext to simplify the upsert metadata manager constructor (#12120)
---
.../manager/realtime/RealtimeTableDataManager.java | 3 +-
...adataAndDictionaryAggregationPlanMakerTest.java | 18 +-
.../upsert/BasePartitionUpsertMetadataManager.java | 31 ++--
.../upsert/BaseTableUpsertMetadataManager.java | 80 ++++-----
...oncurrentMapPartitionUpsertMetadataManager.java | 20 +--
.../ConcurrentMapTableUpsertMetadataManager.java | 4 +-
.../local/upsert/TableUpsertMetadataManager.java | 6 +-
.../pinot/segment/local/upsert/UpsertContext.java | 197 +++++++++++++++++++++
.../MutableSegmentImplUpsertComparisonColTest.java | 37 ++--
.../mutable/MutableSegmentImplUpsertTest.java | 26 ++-
...rrentMapPartitionUpsertMetadataManagerTest.java | 147 +++++++--------
...oncurrentMapTableUpsertMetadataManagerTest.java | 27 +--
12 files changed, 395 insertions(+), 201 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index ffe0d47b46..8bf595d37a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -207,8 +207,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
// NOTE: Set _tableUpsertMetadataManager before initializing it because when preloading is enabled, we need to
// load segments into it
_tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig);
- _tableUpsertMetadataManager.init(tableConfig, schema, this, _serverMetrics, _helixManager,
- _segmentPreloadExecutor);
+ _tableUpsertMetadataManager.init(tableConfig, schema, this, _helixManager, _segmentPreloadExecutor);
}
// For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index a73f9c471f..cf3e33f938 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -38,12 +38,12 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.UpsertContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -54,7 +54,6 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeClass;
@@ -62,6 +61,7 @@ import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -126,13 +126,17 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
@BeforeClass
public void loadSegment()
throws Exception {
+ ServerMetrics.register(mock(ServerMetrics.class));
_indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
- ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
- ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
- new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
- Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null,
- false, 0, 0, INDEX_DIR, serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
+ UpsertContext upsertContext =
+ new UpsertContext.Builder().setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+ .setPrimaryKeyColumns(Collections.singletonList("column6"))
+ .setComparisonColumns(Collections.singletonList("daysSinceEpoch")).setTableIndexDir(INDEX_DIR).build();
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, upsertContext);
+ ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(upsertMetadataManager,
+ new ThreadSafeMutableRoaringBitmap(), null);
}
@AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 9184da1587..b7f9696b11 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -65,6 +65,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected final String _tableNameWithType;
protected final int _partitionId;
+ protected final UpsertContext _context;
protected final List<String> _primaryKeyColumns;
protected final List<String> _comparisonColumns;
protected final String _deleteRecordColumn;
@@ -97,25 +98,23 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
private int _numPendingOperations = 1;
private boolean _closed;
- protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
- List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
- HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
- double metadataTTL, double deletedKeysTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+ protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
- _primaryKeyColumns = primaryKeyColumns;
- _comparisonColumns = comparisonColumns;
- _deleteRecordColumn = deleteRecordColumn;
- _hashFunction = hashFunction;
- _partialUpsertHandler = partialUpsertHandler;
- _enableSnapshot = enableSnapshot;
- _metadataTTL = metadataTTL;
- _deletedKeysTTL = deletedKeysTTL;
- _tableIndexDir = tableIndexDir;
- _snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
- _serverMetrics = serverMetrics;
+ _context = context;
+ _primaryKeyColumns = context.getPrimaryKeyColumns();
+ _comparisonColumns = context.getComparisonColumns();
+ _deleteRecordColumn = context.getDeleteRecordColumn();
+ _hashFunction = context.getHashFunction();
+ _partialUpsertHandler = context.getPartialUpsertHandler();
+ _enableSnapshot = context.isSnapshotEnabled();
+ _snapshotLock = _enableSnapshot ? new ReentrantReadWriteLock() : null;
+ _metadataTTL = context.getMetadataTTL();
+ _deletedKeysTTL = context.getDeletedKeysTTL();
+ _tableIndexDir = context.getTableIndexDir();
+ _serverMetrics = ServerMetrics.get();
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
- if (metadataTTL > 0) {
+ if (_metadataTTL > 0) {
_largestSeenComparisonValue = loadWatermark();
} else {
_largestSeenComparisonValue = Double.MIN_VALUE;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 6c16b4e7f5..e20b264c70 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -36,7 +36,6 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -56,76 +55,67 @@ import org.slf4j.LoggerFactory;
public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetadataManager {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableUpsertMetadataManager.class);
- protected TableConfig _tableConfig;
- protected Schema _schema;
- protected TableDataManager _tableDataManager;
protected String _tableNameWithType;
- protected List<String> _primaryKeyColumns;
- protected List<String> _comparisonColumns;
- protected String _deleteRecordColumn;
- protected HashFunction _hashFunction;
- protected PartialUpsertHandler _partialUpsertHandler;
- protected boolean _enableSnapshot;
- protected double _metadataTTL;
- protected double _deletedKeysTTL;
- protected File _tableIndexDir;
- protected ServerMetrics _serverMetrics;
+ protected TableDataManager _tableDataManager;
protected HelixManager _helixManager;
protected ExecutorService _segmentPreloadExecutor;
+ protected UpsertContext _context;
private volatile boolean _isPreloading = false;
@Override
- public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager,
- ServerMetrics serverMetrics, HelixManager helixManager, @Nullable ExecutorService segmentPreloadExecutor) {
- _tableConfig = tableConfig;
- _schema = schema;
- _tableDataManager = tableDataManager;
+ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, HelixManager helixManager,
+ @Nullable ExecutorService segmentPreloadExecutor) {
_tableNameWithType = tableConfig.getTableName();
+ _tableDataManager = tableDataManager;
+ _helixManager = helixManager;
+ _segmentPreloadExecutor = segmentPreloadExecutor;
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
Preconditions.checkArgument(upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE,
"Upsert must be enabled for table: %s", _tableNameWithType);
- _primaryKeyColumns = schema.getPrimaryKeyColumns();
- Preconditions.checkArgument(!CollectionUtils.isEmpty(_primaryKeyColumns),
+ List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
+ Preconditions.checkArgument(!CollectionUtils.isEmpty(primaryKeyColumns),
"Primary key columns must be configured for upsert enabled table: %s", _tableNameWithType);
- _comparisonColumns = upsertConfig.getComparisonColumns();
- if (_comparisonColumns == null) {
- _comparisonColumns = Collections.singletonList(tableConfig.getValidationConfig().getTimeColumnName());
+ List<String> comparisonColumns = upsertConfig.getComparisonColumns();
+ if (comparisonColumns == null) {
+ comparisonColumns = Collections.singletonList(tableConfig.getValidationConfig().getTimeColumnName());
}
- _deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
- _hashFunction = upsertConfig.getHashFunction();
-
+ PartialUpsertHandler partialUpsertHandler = null;
if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
Preconditions.checkArgument(partialUpsertStrategies != null,
"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
- _partialUpsertHandler =
+ partialUpsertHandler =
new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
- _comparisonColumns);
+ comparisonColumns);
}
- _enableSnapshot = upsertConfig.isEnableSnapshot();
- _metadataTTL = upsertConfig.getMetadataTTL();
- _deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
- _tableIndexDir = tableDataManager.getTableDataDir();
- _serverMetrics = serverMetrics;
- _helixManager = helixManager;
- _segmentPreloadExecutor = segmentPreloadExecutor;
-
- initCustomVariables();
-
+ String deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
+ HashFunction hashFunction = upsertConfig.getHashFunction();
+ boolean enableSnapshot = upsertConfig.isEnableSnapshot();
+ boolean enablePreload = upsertConfig.isEnablePreload();
+ double metadataTTL = upsertConfig.getMetadataTTL();
+ double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
+ File tableIndexDir = tableDataManager.getTableDataDir();
+ _context = new UpsertContext.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setPrimaryKeyColumns(primaryKeyColumns).setComparisonColumns(comparisonColumns)
+ .setDeleteRecordColumn(deleteRecordColumn).setHashFunction(hashFunction)
+ .setPartialUpsertHandler(partialUpsertHandler).setEnableSnapshot(enableSnapshot).setEnablePreload(enablePreload)
+ .setMetadataTTL(metadataTTL).setDeletedKeysTTL(deletedKeysTTL).setTableIndexDir(tableIndexDir).build();
LOGGER.info(
"Initialized {} for table: {} with primary key columns: {}, comparison columns: {}, delete record column: {},"
+ " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {},"
+ " deleted Keys TTL: {}, table index dir: {}", getClass().getSimpleName(), _tableNameWithType,
- _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn, _hashFunction, upsertConfig.getMode(),
- _enableSnapshot, upsertConfig.isEnablePreload(), _metadataTTL, _deletedKeysTTL, _tableIndexDir);
+ primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction, upsertConfig.getMode(), enableSnapshot,
+ enablePreload, metadataTTL, deletedKeysTTL, tableIndexDir);
+
+ initCustomVariables();
- if (_enableSnapshot && segmentPreloadExecutor != null && upsertConfig.isEnablePreload()) {
+ if (enableSnapshot && enablePreload && segmentPreloadExecutor != null) {
// Preloading the segments with snapshots for fast upsert metadata recovery.
// Note that there is an implicit waiting logic between the thread doing the segment preloading here and the
// other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE).
@@ -226,7 +216,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
@VisibleForTesting
IndexLoadingConfig createIndexLoadingConfig() {
return new IndexLoadingConfig(_tableDataManager.getTableDataManagerConfig().getInstanceDataManagerConfig(),
- _tableConfig, _schema);
+ _context.getTableConfig(), _context.getSchema());
}
@VisibleForTesting
@@ -266,7 +256,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
}
private File getValidDocIdsSnapshotFile(String segmentName, String segmentTier) {
- File indexDir = _tableDataManager.getSegmentDataDir(segmentName, segmentTier, _tableConfig);
+ File indexDir = _tableDataManager.getSegmentDataDir(segmentName, segmentTier, _context.getTableConfig());
return new File(SegmentDirectoryPaths.findSegmentDirectory(indexDir), V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
}
@@ -277,6 +267,6 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
@Override
public UpsertConfig.Mode getUpsertMode() {
- return _partialUpsertHandler == null ? UpsertConfig.Mode.FULL : UpsertConfig.Mode.PARTIAL;
+ return _context.getPartialUpsertHandler() == null ? UpsertConfig.Mode.FULL : UpsertConfig.Mode.PARTIAL;
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index fe36054f18..576d679368 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -19,9 +19,7 @@
package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
-import java.io.File;
import java.util.Iterator;
-import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,7 +28,6 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.LazyRow;
@@ -38,7 +35,6 @@ import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.roaringbitmap.PeekableIntIterator;
@@ -58,12 +54,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
@VisibleForTesting
final ConcurrentHashMap<Object, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
- public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
- List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
- HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
- double metadataTTL, double deletedKeysTTL, File tableIndexDir, ServerMetrics serverMetrics) {
- super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn,
- hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, deletedKeysTTL, tableIndexDir, serverMetrics);
+ public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) {
+ super(tableNameWithType, partitionId, context);
}
@Override
@@ -268,15 +260,15 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
if (numDeletedTTLKeys > 0) {
- _logger.info("Deleted {} primary keys based on deletedKeysTTL in the table {}",
- numDeletedTTLKeys, _tableNameWithType);
+ _logger.info("Deleted {} primary keys based on deletedKeysTTL in the table {}", numDeletedTTLKeys,
+ _tableNameWithType);
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
numDeletedTTLKeys);
}
int numMetadataTTLKeys = numMetadataTTLKeysRemoved.get();
if (numMetadataTTLKeys > 0) {
- _logger.info("Deleted {} primary keys based on metadataTTL in the table {}",
- numMetadataTTLKeys, _tableNameWithType);
+ _logger.info("Deleted {} primary keys based on metadataTTL in the table {}", numMetadataTTLKeys,
+ _tableNameWithType);
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.METADATA_TTL_PRIMARY_KEYS_REMOVED,
numMetadataTTLKeys);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 2aeee10749..b0593f8d5f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -35,9 +35,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
@Override
public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
- k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
- _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler,
- _enableSnapshot, _metadataTTL, _deletedKeysTTL, _tableIndexDir, _serverMetrics));
+ k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _context));
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index b6ae51b265..2ac107d790 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.helix.HelixManager;
-import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -35,8 +34,9 @@ import org.apache.pinot.spi.data.Schema;
*/
@ThreadSafe
public interface TableUpsertMetadataManager extends Closeable {
- void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, ServerMetrics serverMetrics,
- HelixManager helixManager, @Nullable ExecutorService segmentPreloadExecutor);
+
+ void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, HelixManager helixManager,
+ @Nullable ExecutorService segmentPreloadExecutor);
PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
new file mode 100644
index 0000000000..707c22151c
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class UpsertContext {
+ private final TableConfig _tableConfig;
+ private final Schema _schema;
+ private final List<String> _primaryKeyColumns;
+ private final List<String> _comparisonColumns;
+ private final String _deleteRecordColumn;
+ private final HashFunction _hashFunction;
+ private final PartialUpsertHandler _partialUpsertHandler;
+ private final boolean _enableSnapshot;
+ private final boolean _enablePreload;
+ private final double _metadataTTL;
+ private final double _deletedKeysTTL;
+ private final File _tableIndexDir;
+
+ private UpsertContext(TableConfig tableConfig, Schema schema, List<String> primaryKeyColumns,
+ List<String> comparisonColumns, @Nullable String deleteRecordColumn, HashFunction hashFunction,
+ @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, boolean enablePreload,
+ double metadataTTL, double deletedKeysTTL, File tableIndexDir) {
+ _tableConfig = tableConfig;
+ _schema = schema;
+ _primaryKeyColumns = primaryKeyColumns;
+ _comparisonColumns = comparisonColumns;
+ _deleteRecordColumn = deleteRecordColumn;
+ _hashFunction = hashFunction;
+ _partialUpsertHandler = partialUpsertHandler;
+ _enableSnapshot = enableSnapshot;
+ _enablePreload = enablePreload;
+ _metadataTTL = metadataTTL;
+ _deletedKeysTTL = deletedKeysTTL;
+ _tableIndexDir = tableIndexDir;
+ }
+
+ public TableConfig getTableConfig() {
+ return _tableConfig;
+ }
+
+ public Schema getSchema() {
+ return _schema;
+ }
+
+ public List<String> getPrimaryKeyColumns() {
+ return _primaryKeyColumns;
+ }
+
+ public List<String> getComparisonColumns() {
+ return _comparisonColumns;
+ }
+
+ public String getDeleteRecordColumn() {
+ return _deleteRecordColumn;
+ }
+
+ public HashFunction getHashFunction() {
+ return _hashFunction;
+ }
+
+ public PartialUpsertHandler getPartialUpsertHandler() {
+ return _partialUpsertHandler;
+ }
+
+ public boolean isSnapshotEnabled() {
+ return _enableSnapshot;
+ }
+
+ public boolean isPreloadEnabled() {
+ return _enablePreload;
+ }
+
+ public double getMetadataTTL() {
+ return _metadataTTL;
+ }
+
+ public double getDeletedKeysTTL() {
+ return _deletedKeysTTL;
+ }
+
+ public File getTableIndexDir() {
+ return _tableIndexDir;
+ }
+
+ public static class Builder {
+ private TableConfig _tableConfig;
+ private Schema _schema;
+ private List<String> _primaryKeyColumns;
+ private List<String> _comparisonColumns;
+ private String _deleteRecordColumn;
+ private HashFunction _hashFunction = HashFunction.NONE;
+ private PartialUpsertHandler _partialUpsertHandler;
+ private boolean _enableSnapshot;
+ private boolean _enablePreload;
+ private double _metadataTTL;
+ private double _deletedKeysTTL;
+ private File _tableIndexDir;
+
+ public Builder setTableConfig(TableConfig tableConfig) {
+ _tableConfig = tableConfig;
+ return this;
+ }
+
+ public Builder setSchema(Schema schema) {
+ _schema = schema;
+ return this;
+ }
+
+ public Builder setPrimaryKeyColumns(List<String> primaryKeyColumns) {
+ _primaryKeyColumns = primaryKeyColumns;
+ return this;
+ }
+
+ public Builder setComparisonColumns(List<String> comparisonColumns) {
+ _comparisonColumns = comparisonColumns;
+ return this;
+ }
+
+ public Builder setDeleteRecordColumn(String deleteRecordColumn) {
+ _deleteRecordColumn = deleteRecordColumn;
+ return this;
+ }
+
+ public Builder setHashFunction(HashFunction hashFunction) {
+ _hashFunction = hashFunction;
+ return this;
+ }
+
+ public Builder setPartialUpsertHandler(PartialUpsertHandler partialUpsertHandler) {
+ _partialUpsertHandler = partialUpsertHandler;
+ return this;
+ }
+
+ public Builder setEnableSnapshot(boolean enableSnapshot) {
+ _enableSnapshot = enableSnapshot;
+ return this;
+ }
+
+ public Builder setEnablePreload(boolean enablePreload) {
+ _enablePreload = enablePreload;
+ return this;
+ }
+
+ public Builder setMetadataTTL(double metadataTTL) {
+ _metadataTTL = metadataTTL;
+ return this;
+ }
+
+ public Builder setDeletedKeysTTL(double deletedKeysTTL) {
+ _deletedKeysTTL = deletedKeysTTL;
+ return this;
+ }
+
+ public Builder setTableIndexDir(File tableIndexDir) {
+ _tableIndexDir = tableIndexDir;
+ return this;
+ }
+
+ public UpsertContext build() {
+ Preconditions.checkState(_tableConfig != null, "Table config must be set");
+ Preconditions.checkState(_schema != null, "Schema must be set");
+ Preconditions.checkState(CollectionUtils.isNotEmpty(_primaryKeyColumns), "Primary key columns must be set");
+ Preconditions.checkState(CollectionUtils.isNotEmpty(_comparisonColumns), "Comparison columns must be set");
+ Preconditions.checkState(_hashFunction != null, "Hash function must be set");
+ Preconditions.checkState(_tableIndexDir != null, "Table index directory must be set");
+ return new UpsertContext(_tableConfig, _schema, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn,
+ _hashFunction, _partialUpsertHandler, _enableSnapshot, _enablePreload, _metadataTTL, _deletedKeysTTL,
+ _tableIndexDir);
+ }
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index fe17e40dc2..38bc66ade5 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -40,21 +40,35 @@ import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class MutableSegmentImplUpsertComparisonColTest {
private static final String SCHEMA_FILE_PATH = "data/test_upsert_comparison_col_schema.json";
private static final String DATA_FILE_PATH = "data/test_upsert_comparison_col_data.json";
- private static CompositeTransformer _recordTransformer;
- private static Schema _schema;
- private static TableConfig _tableConfig;
- private static MutableSegmentImpl _mutableSegmentImpl;
- private static PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+
+ private TableDataManager _tableDataManager;
+ private TableConfig _tableConfig;
+ private Schema _schema;
+ private CompositeTransformer _recordTransformer;
+ private MutableSegmentImpl _mutableSegmentImpl;
+ private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+
+ @BeforeClass
+ public void setUp() {
+ ServerMetrics.register(mock(ServerMetrics.class));
+ _tableDataManager = mock(TableDataManager.class);
+ when(_tableDataManager.getTableDataDir()).thenReturn(new File(REALTIME_TABLE_NAME));
+ }
private UpsertConfig createFullUpsertConfig(HashFunction hashFunction) {
UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.FULL);
@@ -67,20 +81,19 @@ public class MutableSegmentImplUpsertComparisonColTest {
throws Exception {
URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
- _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_tableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfig)
- .build();
+ new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfig).build();
+ _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig);
- tableUpsertMetadataManager.init(_tableConfig, _schema, mock(TableDataManager.class), mock(ServerMetrics.class),
- mock(HelixManager.class), mock(ExecutorService.class));
+ tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager, mock(HelixManager.class),
+ mock(ExecutorService.class));
_partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
- Collections.emptySet(), false, true, upsertConfig, "secondsSinceEpoch",
- _partitionUpsertMetadataManager, null);
+ Collections.emptySet(), false, true, upsertConfig, "secondsSinceEpoch", _partitionUpsertMetadataManager,
+ null);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
_schema.getColumnNames(), null)) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index 4f243c6000..702bd47f80 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -41,22 +41,36 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class MutableSegmentImplUpsertTest {
private static final String SCHEMA_FILE_PATH = "data/test_upsert_schema.json";
private static final String DATA_FILE_PATH = "data/test_upsert_data.json";
- private CompositeTransformer _recordTransformer;
- private Schema _schema;
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+
+ private TableDataManager _tableDataManager;
private TableConfig _tableConfig;
+ private Schema _schema;
+ private CompositeTransformer _recordTransformer;
private MutableSegmentImpl _mutableSegmentImpl;
private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+ @BeforeClass
+ public void setUp() {
+ ServerMetrics.register(mock(ServerMetrics.class));
+ _tableDataManager = mock(TableDataManager.class);
+ when(_tableDataManager.getTableDataDir()).thenReturn(new File(REALTIME_TABLE_NAME));
+ }
+
private UpsertConfig createPartialUpsertConfig(HashFunction hashFunction) {
UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
upsertConfigWithHash.setPartialUpsertStrategies(new HashMap<>());
@@ -76,15 +90,15 @@ public class MutableSegmentImplUpsertTest {
throws Exception {
URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
- _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_tableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash)
+ new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfigWithHash)
.setNullHandlingEnabled(true).build();
+ _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig);
- tableUpsertMetadataManager.init(_tableConfig, _schema, mock(TableDataManager.class), mock(ServerMetrics.class),
- mock(HelixManager.class), mock(ExecutorService.class));
+ tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager, mock(HelixManager.class),
+ mock(ExecutorService.class));
_partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 35e621245c..17a1e048dc 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -48,7 +48,9 @@ import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
@@ -58,6 +60,7 @@ import org.mockito.MockedConstruction;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
@@ -72,13 +75,25 @@ import static org.testng.Assert.*;
public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private static final String RAW_TABLE_NAME = "testTable";
private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+ private static final List<String> PRIMARY_KEY_COLUMNS = Collections.singletonList("pk");
+ private static final List<String> COMPARISON_COLUMNS = Collections.singletonList("timeCol");
+ private static final String DELETE_RECORD_COLUMN = "deleteCol";
private static final File INDEX_DIR =
new File(FileUtils.getTempDirectory(), "ConcurrentMapPartitionUpsertMetadataManagerTest");
+ private UpsertContext.Builder _contextBuilder;
+
@BeforeClass
public void setUp()
throws IOException {
FileUtils.forceMkdir(INDEX_DIR);
+ ServerMetrics.register(mock(ServerMetrics.class));
+ }
+
+ @BeforeMethod
+ public void setUpContextBuilder() {
+ _contextBuilder = new UpsertContext.Builder().setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
+ .setPrimaryKeyColumns(PRIMARY_KEY_COLUMNS).setComparisonColumns(COMPARISON_COLUMNS).setTableIndexDir(INDEX_DIR);
}
@AfterClass
@@ -90,9 +105,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
@Test
public void testStartFinishOperation() {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, 0, INDEX_DIR,
- mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
// Start 2 operations
assertTrue(upsertMetadataManager.startOperation());
@@ -150,6 +163,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
@Test
public void testUpsertMetadataCleanupWithTTLConfig()
throws IOException {
+ _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30);
verifyRemoveExpiredPrimaryKeys(new Integer(80), new Integer(120));
verifyRemoveExpiredPrimaryKeys(new Float(80), new Float(120));
verifyRemoveExpiredPrimaryKeys(new Double(80), new Double(120));
@@ -165,6 +179,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
@Test
public void testGetQueryableDocIds() {
+ _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN);
+
boolean[] deleteFlags1 = new boolean[]{false, false, false, true, true, false};
int[] docIds1 = new int[]{2, 4, 5};
MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
@@ -202,11 +218,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot)
throws IOException {
- String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
- mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+ _contextBuilder.setHashFunction(hashFunction).build());
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -354,6 +368,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
@Test
public void testAddReplaceRemoveSegmentWithRecordDelete()
throws IOException {
+ _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN);
verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.NONE, false);
verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MD5, false);
verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, false);
@@ -364,12 +379,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction hashFunction, boolean enableSnapshot)
throws IOException {
- String comparisonColumn = "timeCol";
- String deleteRecordColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0,
- 0, INDEX_DIR, mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+ _contextBuilder.setHashFunction(hashFunction).setEnableSnapshot(enableSnapshot).build());
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -548,11 +560,11 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
}
private List<RecordInfo> getRecordInfoListForTTL(int numRecords, int[] primaryKeys, int[] timestamps,
- @Nullable boolean[] deleteRecordFlags) {
+ @Nullable boolean[] deleteRecordFlags) {
List<RecordInfo> recordInfoList = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new Integer(timestamps[i]),
- deleteRecordFlags != null && deleteRecordFlags[i]));
+ deleteRecordFlags != null && deleteRecordFlags[i]));
}
return recordInfoList;
}
@@ -667,11 +679,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddRecord(HashFunction hashFunction)
throws IOException {
- String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
- mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+ _contextBuilder.setHashFunction(hashFunction).build());
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -760,11 +770,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddOutOfOrderRecord(HashFunction hashFunction)
throws IOException {
- String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
- mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+ _contextBuilder.setHashFunction(hashFunction).build());
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -828,11 +836,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
}
private void verifyPreloadSegment(HashFunction hashFunction) {
- String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
- mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+ _contextBuilder.setHashFunction(hashFunction).build());
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -876,6 +882,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
@Test
public void testAddRecordWithDeleteColumn()
throws IOException {
+ _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN);
verifyAddRecordWithDeleteColumn(HashFunction.NONE);
verifyAddRecordWithDeleteColumn(HashFunction.MD5);
verifyAddRecordWithDeleteColumn(HashFunction.MURMUR3);
@@ -883,12 +890,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction)
throws IOException {
- String comparisonColumn = "timeCol";
- String deleteColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
- false, 0, 0, INDEX_DIR, mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+ _contextBuilder.setHashFunction(hashFunction).build());
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// queryableDocIds is same as validDocIds in the absence of delete markers
@@ -985,21 +989,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
@Test
public void testRemoveExpiredDeletedKeys()
- throws IOException {
+ throws IOException {
+ _contextBuilder.setDeleteRecordColumn(DELETE_RECORD_COLUMN).setDeletedKeysTTL(20);
verifyRemoveExpiredDeletedKeys(HashFunction.NONE);
verifyRemoveExpiredDeletedKeys(HashFunction.MD5);
verifyRemoveExpiredDeletedKeys(HashFunction.MURMUR3);
}
private void verifyRemoveExpiredDeletedKeys(HashFunction hashFunction)
- throws IOException {
-
- String comparisonColumn = "timeCol";
- String deleteColumn = "deleteCol";
+ throws IOException {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
- false, 0, 20, INDEX_DIR, mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+ _contextBuilder.setHashFunction(hashFunction).build());
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -1010,9 +1011,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
ThreadSafeMutableRoaringBitmap queryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment1 =
- mockImmutableSegment(1, validDocIds1, queryableDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
+ mockImmutableSegment(1, validDocIds1, queryableDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
- getRecordInfoListForTTL(numRecords, primaryKeys, timestamps, null).iterator());
+ getRecordInfoListForTTL(numRecords, primaryKeys, timestamps, null).iterator());
// Update records from the second segment
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
@@ -1069,16 +1070,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2});
assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{});
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
}
private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue)
throws IOException {
- File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
-
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, 0, tableDir,
- mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1095,8 +1098,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
ImmutableSegmentImpl segment1 =
- mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"),
- earlierComparisonValue, null);
+ mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, earlierComparisonValue,
+ null);
int[] docIds1 = new int[]{0, 1, 2, 3};
MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
@@ -1141,12 +1144,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddOutOfTTLSegment()
throws IOException {
- File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
-
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, 0, tableDir,
- mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1163,8 +1162,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
ImmutableSegmentImpl segment1 =
- mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"),
- new Double(80), null);
+ mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, new Double(80), null);
int[] docIds1 = new int[]{0, 1, 2, 3};
MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
@@ -1199,8 +1197,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
validDocIdsSnapshot2.add(docIds2);
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment2 =
- mockImmutableSegmentWithEndTime(1, validDocIds2, null, primaryKeys2, Collections.singletonList("timeCol"),
- new Double(80), validDocIdsSnapshot2);
+ mockImmutableSegmentWithEndTime(1, validDocIds2, null, primaryKeys2, COMPARISON_COLUMNS, new Double(80),
+ validDocIdsSnapshot2);
upsertMetadataManager.addSegment(segment2);
// out of ttl segment should not be added to recordLocationMap
assertEquals(recordLocationMap.size(), 5);
@@ -1214,12 +1212,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddOutOfTTLSegmentWithRecordDelete()
throws IOException {
- String comparisonColumn = "timeCol";
- String deleteRecordColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
- 0, INDEX_DIR, mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -1235,8 +1229,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
int[] docIds1 = new int[]{2, 4, 5};
MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
validDocIdsSnapshot1.add(docIds1);
- ImmutableSegmentImpl segment1 = mockImmutableSegmentWithEndTime(1, validDocIds1, queryableDocIds1, primaryKeys1,
- Collections.singletonList(comparisonColumn), new Double(120), validDocIdsSnapshot1);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithEndTime(1, validDocIds1, queryableDocIds1, primaryKeys1, COMPARISON_COLUMNS,
+ new Double(120), validDocIdsSnapshot1);
// get recordInfo from validDocIdSnapshot.
// segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
@@ -1266,7 +1261,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
validDocIdsSnapshot2.add(docIds2);
ImmutableSegmentImpl segment2 =
mockImmutableSegmentWithEndTime(2, validDocIds2, queryableDocIds2, getPrimaryKeyList(numRecords, primaryKeys),
- Collections.singletonList(comparisonColumn), new Double(40), validDocIdsSnapshot2);
+ COMPARISON_COLUMNS, new Double(40), validDocIdsSnapshot2);
// get recordInfo from validDocIdSnapshot.
// segment2 snapshot: 3 -> {3, 40}, 4 -> {4, 40}
@@ -1299,12 +1294,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
public void verifyGetQueryableDocIds(boolean isDeleteColumnNull, boolean[] deleteFlags,
MutableRoaringBitmap validDocIdsSnapshot, MutableRoaringBitmap queryableDocIds) {
- String comparisonColumn = "timeCol";
- String deleteRecordColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
- 0, INDEX_DIR, mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
try (MockedConstruction<PinotSegmentColumnReader> deleteColReader = mockConstruction(PinotSegmentColumnReader.class,
(mockReader, context) -> {
@@ -1318,9 +1309,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
when(segmentMetadata.getTotalDocs()).thenReturn(deleteFlags.length);
when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap() {{
- this.put(comparisonColumn, columnMetadata);
+ this.put(COMPARISON_COLUMNS.get(0), columnMetadata);
}});
- when(columnMetadata.getMaxValue()).thenReturn(null);
ImmutableSegmentImpl segment =
mockImmutableSegmentWithSegmentMetadata(1, new ThreadSafeMutableRoaringBitmap(), null, null, segmentMetadata,
@@ -1331,12 +1321,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddSegmentForTTL(Comparable comparisonValue)
throws IOException {
- File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
-
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, 0, tableDir,
- mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1352,8 +1338,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
ImmutableSegmentImpl segment1 =
- mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"), -1,
- null);
+ mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, -1, null);
int[] docIds1 = new int[]{0, 1, 2, 3};
MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
@@ -1395,9 +1380,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyPersistAndLoadWatermark()
throws IOException {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
- new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, 0, INDEX_DIR,
- mock(ServerMetrics.class));
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build());
double currentTimeMs = System.currentTimeMillis();
upsertMetadataManager.persistWatermark(currentTimeMs);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
index 526e0920c3..8ceeb4ce0b 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManagerTest.java
@@ -59,12 +59,19 @@ import static org.testng.Assert.assertTrue;
public class ConcurrentMapTableUpsertMetadataManagerTest {
private static final File TEMP_DIR =
new File(FileUtils.getTempDirectory(), "ConcurrentMapTableUpsertMetadataManagerTest");
+ private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+ private static final File TABLE_DATA_DIR = new File(TEMP_DIR, REALTIME_TABLE_NAME);
+
+ private TableDataManager _tableDataManager;
private ExecutorService _segmentPreloadExecutor;
@BeforeClass
public void setUp()
throws Exception {
FileUtils.deleteQuietly(TEMP_DIR);
+ ServerMetrics.register(mock(ServerMetrics.class));
+ _tableDataManager = mock(TableDataManager.class);
+ when(_tableDataManager.getTableDataDir()).thenReturn(TABLE_DATA_DIR);
_segmentPreloadExecutor = Executors.newFixedThreadPool(1);
}
@@ -86,16 +93,14 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
// Preloading is skipped as snapshot is not enabled.
ConcurrentMapTableUpsertMetadataManager mgr = new ConcurrentMapTableUpsertMetadataManager();
assertFalse(mgr.isPreloading());
- mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
- _segmentPreloadExecutor);
+ mgr.init(tableConfig, schema, _tableDataManager, mock(HelixManager.class), _segmentPreloadExecutor);
assertFalse(mgr.isPreloading());
// Preloading is skipped as preloading is not turned on.
upsertConfig.setEnableSnapshot(true);
mgr = new ConcurrentMapTableUpsertMetadataManager();
assertFalse(mgr.isPreloading());
- mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
- _segmentPreloadExecutor);
+ mgr.init(tableConfig, schema, _tableDataManager, mock(HelixManager.class), _segmentPreloadExecutor);
assertFalse(mgr.isPreloading());
upsertConfig.setEnablePreload(true);
@@ -103,8 +108,7 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
assertFalse(mgr.isPreloading());
// The preloading logic will hit on error as the HelixManager mock is not fully setup. But failure of preloading
// should not fail the init() method.
- mgr.init(tableConfig, schema, mock(TableDataManager.class), mock(ServerMetrics.class), mock(HelixManager.class),
- _segmentPreloadExecutor);
+ mgr.init(tableConfig, schema, _tableDataManager, mock(HelixManager.class), _segmentPreloadExecutor);
assertFalse(mgr.isPreloading());
}
@@ -141,14 +145,13 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
};
// Setup mocks for TableConfig and Schema.
- String tableNameWithType = "myTable_REALTIME";
TableConfig tableConfig = mock(TableConfig.class);
UpsertConfig upsertConfig = new UpsertConfig();
upsertConfig.setComparisonColumn("ts");
upsertConfig.setEnablePreload(true);
upsertConfig.setEnableSnapshot(true);
when(tableConfig.getUpsertConfig()).thenReturn(upsertConfig);
- when(tableConfig.getTableName()).thenReturn(tableNameWithType);
+ when(tableConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME);
Schema schema = mock(Schema.class);
when(schema.getPrimaryKeyColumns()).thenReturn(Collections.singletonList("pk"));
@@ -169,16 +172,18 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
SegmentZKMetadata realtimeSegmentZKMetadata = new SegmentZKMetadata("online_seg01");
realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
when(propertyStore.get(
- eq(ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, "online_seg01")), any(),
+ eq(ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME, "online_seg01")), any(),
anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord());
realtimeSegmentZKMetadata = new SegmentZKMetadata("online_seg02");
realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
when(propertyStore.get(
- eq(ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, "online_seg02")), any(),
+ eq(ZKMetadataProvider.constructPropertyStorePathForSegment(REALTIME_TABLE_NAME, "online_seg02")), any(),
anyInt())).thenReturn(realtimeSegmentZKMetadata.toZNRecord());
// No snapshot file for online_seg01, so it's skipped.
TableDataManager tableDataManager = mock(TableDataManager.class);
+ when(tableDataManager.getTableDataDir()).thenReturn(TABLE_DATA_DIR);
+
File seg01IdxDir = new File(TEMP_DIR, "online_seg01");
FileUtils.forceMkdir(seg01IdxDir);
when(tableDataManager.getSegmentDataDir("online_seg01", null, tableConfig)).thenReturn(seg01IdxDir);
@@ -189,7 +194,7 @@ public class ConcurrentMapTableUpsertMetadataManagerTest {
when(tableDataManager.getSegmentDataDir("online_seg02", null, tableConfig)).thenReturn(seg02IdxDir);
assertFalse(mgr.isPreloading());
- mgr.init(tableConfig, schema, tableDataManager, mock(ServerMetrics.class), helixManager, _segmentPreloadExecutor);
+ mgr.init(tableConfig, schema, tableDataManager, helixManager, _segmentPreloadExecutor);
assertEquals(preloadedSegments.size(), 1);
assertTrue(preloadedSegments.contains("online_seg02"));
assertTrue(wasPreloading.get());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org