You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/12/08 22:09:06 UTC
[pinot] branch master updated: enable MergeRollupTask on realtime tables (#9890)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d3ea8dc162 enable MergeRollupTask on realtime tables (#9890)
d3ea8dc162 is described below
commit d3ea8dc1628a885f4faa8e8cf7b7c69c24782f79
Author: Haitao Zhang <ha...@startree.ai>
AuthorDate: Thu Dec 8 14:09:00 2022 -0800
enable MergeRollupTask on realtime tables (#9890)
* enable MergeRollupTask on realtime tables
* disallow when upsert or dedup is enabled
* address comments
* add integration test
* address comments
* add lisence
* add comments to explain the logic
* address comments
* fix r2o task
* allow more waiting time on conditions
* add output for test failure reason check
* use longer condition check interval
* add wait before condition check
* make it work in github integration test
* add a comment to the test case
* update comments
---
.../MergeRollupMinionClusterIntegrationTest.java | 138 ++++++++++++++++++-
.../BaseMultipleSegmentsConversionExecutor.java | 13 +-
.../mergerollup/MergeRollupTaskGenerator.java | 148 +++++++++++++++------
.../mergerollup/MergeRollupTaskGeneratorTest.java | 95 ++++++++++---
.../apache/pinot/spi/config/table/TableConfig.java | 10 ++
5 files changed, 341 insertions(+), 63 deletions(-)
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index 9602275068..87e6eb008d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -72,6 +72,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
private static final String SINGLE_LEVEL_ROLLUP_TEST_TABLE = "myTable2";
private static final String MULTI_LEVEL_CONCAT_TEST_TABLE = "myTable3";
private static final String SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE = "myTable4";
+ private static final String SINGLE_LEVEL_CONCAT_TEST_REALTIME_TABLE = "myTable5";
private static final long TIMEOUT_IN_MS = 10_000L;
protected PinotHelixTaskResourceManager _helixTaskResourceManager;
@@ -98,6 +99,8 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
startController();
startBrokers(1);
startServers(1);
+ // Start Kafka
+ startKafka();
// Create and upload the schema and table config
Schema schema = createSchema();
@@ -133,6 +136,13 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3);
uploadSegments(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE, _tarDir4);
+ // create the realtime table
+ TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+ addTableConfig(tableConfig);
+ // Push data into Kafka
+ pushAvroIntoKafka(avroFiles);
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
// Set up the H2 connection
setUpH2Connection(avroFiles);
@@ -146,6 +156,26 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
_pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
}
+ // this override is used by createRealtimeTableConfig
+ @Override
+ protected String getTableName() {
+ return SINGLE_LEVEL_CONCAT_TEST_REALTIME_TABLE;
+ }
+
+ // this override is used by createRealtimeTableConfig
+ @Override
+ protected TableTaskConfig getTaskConfig() {
+ Map<String, String> tableTaskConfigs = new HashMap<>();
+ tableTaskConfigs.put("100days.mergeType", "concat");
+ tableTaskConfigs.put("100days.bufferTimePeriod", "1d");
+ tableTaskConfigs.put("100days.bucketTimePeriod", "100d");
+ tableTaskConfigs.put("100days.maxNumRecordsPerSegment", "15000");
+ tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000");
+ tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min");
+ tableTaskConfigs.put("WeatherDelay.aggregationType", "sum");
+ return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
+ }
+
private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig taskConfig) {
return createOfflineTableConfig(tableName, taskConfig, null);
}
@@ -265,7 +295,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
* Test single level concat task with maxNumRecordPerTask, maxNumRecordPerSegment constraints
*/
@Test
- public void testSingleLevelConcat()
+ public void testOfflineTableSingleLevelConcat()
throws Exception {
// The original segments are time partitioned by month:
// segmentName (totalDocs)
@@ -378,7 +408,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
* Push type is set to Metadata
*/
@Test
- public void testSingleLevelConcatWithMetadataPush()
+ public void testOfflineTableSingleLevelConcatWithMetadataPush()
throws Exception {
// The original segments are time partitioned by month:
// segmentName (totalDocs)
@@ -490,7 +520,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
* Test single level rollup task with duplicate data (original segments * 2)
*/
@Test
- public void testSingleLevelRollup()
+ public void testOfflineTableSingleLevelRollup()
throws Exception {
// The original segments are time partitioned by month:
// segmentName (totalDocs)
@@ -602,7 +632,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
* Test multi level concat task
*/
@Test
- public void testMultiLevelConcat()
+ public void testOfflineTableMultiLevelConcat()
throws Exception {
// The original segments are time partitioned by month:
// segmentName (totalDocs)
@@ -769,6 +799,105 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
}, 600_000L, "Failed to complete task");
}
+ // The use case is similar as the one defined in offline table
+ @Test
+ public void testRealtimeTableSingleLevelConcat()
+ throws Exception {
+ // The original segments:
+ // mytable__0__0__{ts00} ... mytable__0__23__{ts023}
+ // mytable__1__0__{ts10} ... mytable__1__22__{ts122}
+ //
+ // Expected result segments:
+ // merged_100days_{ts1}_0_mytable_16071_16099_0
+ // merged_100days_{ts2}_0_mytable_16100_16154_0
+ // merged_100days_{ts2}_0_mytable_16101_16146_1
+ // merged_100days_{ts2}_1_mytable_16147_16199_0
+ // merged_100days_{ts2}_2_mytable_16196_16199_0
+ // merged_100days_{ts3}_0_mytable_16200_16252_1
+ // merged_100days_{ts3}_0_mytable_16200_16252_0
+ // merged_100days_{ts3}_1_mytable_16245_16295_0
+ // merged_100days_{ts3}_2_mytable_16290_16299_0
+ // merged_100days_{ts4}_0_mytable_16300_16359_0
+ // merged_100days_{ts4}_0_mytable_16323_16345_1
+ // merged_100days_{ts4}_1_mytable_16358_16399_0
+ // merged_100days_{ts5}_0_mytable_16400_16435_0
+ // mytable__0__23__{ts023} (in progress)
+ // mytable__1__22__{ts122} (in progress)
+ PinotHelixTaskResourceManager helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
+ PinotTaskManager taskManager = _controllerStarter.getTaskManager();
+ PinotHelixResourceManager pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
+ String tableName = getTableName();
+
+ String sqlQuery = "SELECT count(*) FROM " + tableName; // 115545 rows for the test table
+ JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+ int[] expectedNumSubTasks = {1, 3, 3, 2, 1};
+ int[] expectedNumSegmentsQueried = {44, 37, 26, 18, 15};
+ long expectedWatermark = 16000 * 86_400_000L;
+ String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ int numTasks = 0;
+ for (String tasks = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
+ tasks != null; tasks =
+ taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
+ assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
+ assertTrue(helixTaskResourceManager.getTaskQueues()
+ .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
+
+ // Will not schedule task if there's incomplete task
+ assertNull(
+ taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ waitForTaskToComplete();
+
+ // Check watermark
+ MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord(
+ taskManager.getClusterInfoAccessor()
+ .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, realtimeTableName));
+ assertNotNull(minionTaskMetadata);
+ assertEquals((long) minionTaskMetadata.getWatermarkMap().get("100days"), expectedWatermark);
+ expectedWatermark += 100 * 86_400_000L;
+
+ // Check metadata of merged segments
+ for (SegmentZKMetadata metadata : pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName)) {
+ if (metadata.getSegmentName().startsWith("merged")) {
+ // Check merged segment zk metadata
+ assertNotNull(metadata.getCustomMap());
+ assertEquals("100days",
+ metadata.getCustomMap().get(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY));
+ // Check merged segments are time partitioned
+ assertEquals(metadata.getEndTimeMs() / (86_400_000L * 100), metadata.getStartTimeMs() / (86_400_000L * 100));
+ }
+ }
+
+ final int finalNumTasks = numTasks;
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ // Check num total doc of merged segments are the same as the original segments
+ JsonNode actualJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+ if (!SqlResultComparator.areEqual(actualJson, expectedJson, sqlQuery)) {
+ return false;
+ }
+ // Check query routing
+ int numSegmentsQueried = actualJson.get("numSegmentsQueried").asInt();
+ return numSegmentsQueried == expectedNumSegmentsQueried[finalNumTasks]
+ // when running on github tests, the consumer sometimes queries one more segment
+ || numSegmentsQueried == expectedNumSegmentsQueried[finalNumTasks] + 1;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, TIMEOUT_IN_MS, "Timeout while validating segments");
+ }
+ // Check total tasks
+ assertEquals(numTasks, 5);
+
+ assertTrue(_controllerStarter.getControllerMetrics()
+ .containsGauge("mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days"));
+
+ // Drop the table
+ dropRealtimeTable(tableName);
+
+ // Check if the task metadata is cleaned up on table deletion
+ verifyTableDelete(realtimeTableName);
+ }
+
@AfterClass
public void tearDown()
throws Exception {
@@ -776,6 +905,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
stopServer();
stopBroker();
stopController();
+ stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index c022c82ca2..c5b5f2b27a 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -49,6 +49,7 @@ import org.apache.pinot.minion.event.MinionEventObservers;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -275,7 +276,17 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true");
NameValuePair tableNameParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
TableNameBuilder.extractRawTableName(tableNameWithType));
- List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter);
+ NameValuePair tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
+ TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString());
+ // RealtimeToOfflineSegmentsTask pushed segments to the corresponding offline table
+ // TODO: This is not clean to put the override here, but let's think about it harder to see what is the proper
+ // way to override it.
+ if (MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE.equals(taskType)) {
+ tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
+ TableType.OFFLINE.toString());
+ }
+ List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter,
+ tableTypeParameter);
pushSegment(tableNameParameter.getValue(), configs, outputSegmentTarURI, httpHeaders, parameters,
segmentConversionResult);
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 6043320495..9da496348f 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.minion.tasks.mergerollup;
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
@@ -27,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.I0Itec.zkclient.exception.ZkException;
import org.apache.commons.lang3.StringUtils;
@@ -63,13 +65,26 @@ import org.slf4j.LoggerFactory;
/**
* A {@link PinotTaskGenerator} implementation for generating tasks of type {@link MergeRollupTask}
*
- * TODO: Add the support for realtime table
+ * Assumptions:
+ * - When the MergeRollupTask starts the first time, records older than the min(now ms, max end time ms of all ready to
+ * process segments) - bufferTimeMs have already been ingested. If not, newly ingested records older than that time
+ * may not be properly merged (Due to the latest watermarks advanced too far before records are ingested).
+ * - If it is needed, there are backfill protocols to ingest and replace records older than the latest watermarks.
+ * Those protocols can handle time alignment (according to merge levels configurations) correctly.
+ * - If it is needed, there are reconcile protocols to merge & rollup newly ingested segments that are (1) older than
+ * the latest watermarks, and (2) not time aligned according to merge levels configurations
+ * - For realtime tables, those protocols are needed if streaming records arrive late (older thant the latest
+ * watermarks)
+ * - For offline tables, those protocols are needed if there are non-time-aligned segments ingested accidentally.
*
- * Steps:
*
+ * Steps:
* - Pre-select segments:
* - Fetch all segments, select segments based on segment lineage (removing segmentsFrom for COMPLETED lineage
* entry and segmentsTo for IN_PROGRESS lineage entry)
+ * - For realtime tables, remove
+ * - in-progress segments (Segment.Realtime.Status.IN_PROGRESS), and
+ * - sealed segments with start time later than the earliest start time of all in progress segments
* - Remove empty segments
* - Sort segments based on startTime and endTime in ascending order
*
@@ -133,22 +148,25 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
if (!validate(tableConfig, taskType)) {
continue;
}
- String offlineTableName = tableConfig.getTableName();
- LOGGER.info("Start generating task configs for table: {} for task: {}", offlineTableName, taskType);
+ String tableNameWithType = tableConfig.getTableName();
+ LOGGER.info("Start generating task configs for table: {} for task: {}", tableNameWithType, taskType);
// Get all segment metadata
- List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName);
+ List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+ // Filter segments based on status
+ List<SegmentZKMetadata> preSelectedSegmentsBasedOnStatus
+ = filterSegmentsBasedOnStatus(tableConfig.getTableType(), allSegments);
// Select current segment snapshot based on lineage, filter out empty segments
- SegmentLineage segmentLineage = _clusterInfoAccessor.getSegmentLineage(offlineTableName);
+ SegmentLineage segmentLineage = _clusterInfoAccessor.getSegmentLineage(tableNameWithType);
Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
- for (SegmentZKMetadata segment : allSegments) {
+ for (SegmentZKMetadata segment : preSelectedSegmentsBasedOnStatus) {
preSelectedSegmentsBasedOnLineage.add(segment.getSegmentName());
}
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage, segmentLineage);
List<SegmentZKMetadata> preSelectedSegments = new ArrayList<>();
- for (SegmentZKMetadata segment : allSegments) {
+ for (SegmentZKMetadata segment : preSelectedSegmentsBasedOnStatus) {
if (preSelectedSegmentsBasedOnLineage.contains(segment.getSegmentName()) && segment.getTotalDocs() > 0
&& MergeTaskUtils.allowMerge(segment)) {
preSelectedSegments.add(segment);
@@ -158,8 +176,8 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
if (preSelectedSegments.isEmpty()) {
// Reset the watermark time if no segment found. This covers the case where the table is newly created or
// all segments for the existing table got deleted.
- resetDelayMetrics(offlineTableName);
- LOGGER.info("Skip generating task: {} for table: {}, no segment is found.", taskType, offlineTableName);
+ resetDelayMetrics(tableNameWithType);
+ LOGGER.info("Skip generating task: {} for table: {}, no segment is found.", taskType, tableNameWithType);
continue;
}
@@ -186,7 +204,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
// Get incomplete merge levels
Set<String> inCompleteMergeLevels = new HashSet<>();
- for (Map.Entry<String, TaskState> entry : TaskGeneratorUtils.getIncompleteTasks(taskType, offlineTableName,
+ for (Map.Entry<String, TaskState> entry : TaskGeneratorUtils.getIncompleteTasks(taskType, tableNameWithType,
_clusterInfoAccessor).entrySet()) {
for (PinotTaskConfig taskConfig : _clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
inCompleteMergeLevels.add(taskConfig.getConfigs().get(MergeRollupTask.MERGE_LEVEL_KEY));
@@ -194,11 +212,11 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
}
ZNRecord mergeRollupTaskZNRecord = _clusterInfoAccessor
- .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, offlineTableName);
+ .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType);
int expectedVersion = mergeRollupTaskZNRecord != null ? mergeRollupTaskZNRecord.getVersion() : -1;
MergeRollupTaskMetadata mergeRollupTaskMetadata =
mergeRollupTaskZNRecord != null ? MergeRollupTaskMetadata.fromZNRecord(mergeRollupTaskZNRecord)
- : new MergeRollupTaskMetadata(offlineTableName, new TreeMap<>());
+ : new MergeRollupTaskMetadata(tableNameWithType, new TreeMap<>());
List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>();
// Schedule tasks from lowest to highest merge level (e.g. Hourly -> Daily -> Monthly -> Yearly)
@@ -211,7 +229,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
// Skip scheduling if there's incomplete task for current mergeLevel
if (inCompleteMergeLevels.contains(mergeLevel)) {
LOGGER.info("Found incomplete task of merge level: {} for the same table: {}, Skipping task generation: {}",
- mergeLevel, offlineTableName, taskType);
+ mergeLevel, tableNameWithType, taskType);
continue;
}
@@ -220,14 +238,14 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
long bucketMs = TimeUtils.convertPeriodToMillis(bucketPeriod);
if (bucketMs <= 0) {
LOGGER.error("Bucket time period: {} (table : {}, mergeLevel : {}) must be larger than 0", bucketPeriod,
- offlineTableName, mergeLevel);
+ tableNameWithType, mergeLevel);
continue;
}
String bufferPeriod = mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY);
long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
if (bufferMs < 0) {
LOGGER.error("Buffer time period: {} (table : {}, mergeLevel : {}) must be larger or equal to 0",
- bufferPeriod, offlineTableName, mergeLevel);
+ bufferPeriod, tableNameWithType, mergeLevel);
continue;
}
String maxNumParallelBucketsStr = mergeConfigs.get(MergeTask.MAX_NUM_PARALLEL_BUCKETS);
@@ -235,7 +253,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
: DEFAULT_NUM_PARALLEL_BUCKETS;
if (maxNumParallelBuckets <= 0) {
LOGGER.error("Maximum number of parallel buckets: {} (table : {}, mergeLevel : {}) must be larger than 0",
- maxNumParallelBuckets, offlineTableName, mergeLevel);
+ maxNumParallelBuckets, tableNameWithType, mergeLevel);
continue;
}
@@ -255,14 +273,14 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
lowestLevelMaxValidBucketEndTimeMs =
Math.max(lowestLevelMaxValidBucketEndTimeMs, currentValidBucketEndTimeMs);
}
- _tableLowestLevelMaxValidBucketEndTimeMs.put(offlineTableName, lowestLevelMaxValidBucketEndTimeMs);
+ _tableLowestLevelMaxValidBucketEndTimeMs.put(tableNameWithType, lowestLevelMaxValidBucketEndTimeMs);
}
// Create delay metrics even if there's no task scheduled, this helps the case that the controller is restarted
// but the metrics are not available until the controller schedules a valid task
- createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, watermarkMs, bufferMs, bucketMs);
+ createOrUpdateDelayMetrics(tableNameWithType, mergeLevel, null, watermarkMs, bufferMs, bucketMs);
if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata)) {
LOGGER.info("Bucket with start: {} and end: {} (table : {}, mergeLevel : {}) cannot be merged yet",
- bucketStartMs, bucketEndMs, offlineTableName, mergeLevel);
+ bucketStartMs, bucketEndMs, tableNameWithType, mergeLevel);
continue;
}
@@ -336,18 +354,18 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
}
if (selectedSegmentsForAllBuckets.isEmpty()) {
- LOGGER.info("No unmerged segment found for table: {}, mergeLevel: {}", offlineTableName, mergeLevel);
+ LOGGER.info("No unmerged segment found for table: {}, mergeLevel: {}", tableNameWithType, mergeLevel);
continue;
}
// Bump up watermark to the earliest start time of selected segments truncated to the closest bucket boundary
long newWatermarkMs = selectedSegmentsForAllBuckets.get(0).get(0).getStartTimeMs() / bucketMs * bucketMs;
mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, newWatermarkMs);
- LOGGER.info("Update watermark for table: {}, mergeLevel: {} from: {} to: {}", offlineTableName, mergeLevel,
+ LOGGER.info("Update watermark for table: {}, mergeLevel: {} from: {} to: {}", tableNameWithType, mergeLevel,
watermarkMs, newWatermarkMs);
// Update the delay metrics
- createOrUpdateDelayMetrics(offlineTableName, mergeLevel, lowerMergeLevel, newWatermarkMs, bufferMs, bucketMs);
+ createOrUpdateDelayMetrics(tableNameWithType, mergeLevel, lowerMergeLevel, newWatermarkMs, bufferMs, bucketMs);
// Create task configs
int maxNumRecordsPerTask =
@@ -357,7 +375,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
if (segmentPartitionConfig == null) {
for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) {
pinotTaskConfigsForTable.addAll(
- createPinotTaskConfigs(selectedSegmentsPerBucket, offlineTableName, maxNumRecordsPerTask, mergeLevel,
+ createPinotTaskConfigs(selectedSegmentsPerBucket, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
null, mergeConfigs, taskConfigs));
}
} else {
@@ -396,13 +414,13 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
List<Integer> partition = entry.getKey();
List<SegmentZKMetadata> partitionedSegments = entry.getValue();
pinotTaskConfigsForTable.addAll(
- createPinotTaskConfigs(partitionedSegments, offlineTableName, maxNumRecordsPerTask, mergeLevel,
+ createPinotTaskConfigs(partitionedSegments, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
partition, mergeConfigs, taskConfigs));
}
if (!outlierSegments.isEmpty()) {
pinotTaskConfigsForTable.addAll(
- createPinotTaskConfigs(outlierSegments, offlineTableName, maxNumRecordsPerTask, mergeLevel,
+ createPinotTaskConfigs(outlierSegments, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
null, mergeConfigs, taskConfigs));
}
}
@@ -416,11 +434,11 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
} catch (ZkException e) {
LOGGER.error(
"Version changed while updating merge/rollup task metadata for table: {}, skip scheduling. There are "
- + "multiple task schedulers for the same table, need to investigate!", offlineTableName);
+ + "multiple task schedulers for the same table, need to investigate!", tableNameWithType);
continue;
}
pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
- LOGGER.info("Finished generating task configs for table: {} for task: {}, numTasks: {}", offlineTableName,
+ LOGGER.info("Finished generating task configs for table: {} for task: {}, numTasks: {}", tableNameWithType,
taskType, pinotTaskConfigsForTable.size());
}
@@ -430,22 +448,70 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
return pinotTaskConfigs;
}
+ @VisibleForTesting
+ static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType tableType, List<SegmentZKMetadata> allSegments) {
+ if (tableType == TableType.REALTIME) {
+ // For realtime table, don't process
+ // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS)
+ // 2. sealed segments with start time later than the earliest start time of all in progress segments
+ // This prevents those in-progress segments from not being merged.
+ //
+ // Note that we make the following two assumptions here:
+ // 1. streaming data consumer lags are negligible
+ // 2. streaming data records are ingested mostly in chronological order (no records are ingested with delay larger
+ // than bufferTimeMS)
+ //
+ // We don't handle the following cases intentionally because it will be either overkill or too complex
+ // 1. New partition added. If new partitions are not picked up timely, the MergeRollupTask will move watermarks
+ // forward, and may not be able to merge some lately-created segments for those new partitions -- users should
+ // configure pinot properly to discover new partitions timely, or they should restart pinot servers manually
+ // for new partitions to be picked up
+ // 2. (1) no new in-progress segments are created for some partitions (2) new in-progress segments are created for
+ // partitions, but there is no record consumed (i.e, empty in-progress segments). In those two cases,
+ // if new records are consumed later, the MergeRollupTask may have already moved watermarks forward, and may
+ // not be able to merge those lately-created segments -- we assume that users will have a way to backfill those
+ // records correctly.
+ long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
+ for (SegmentZKMetadata segmentZKMetadata : allSegments) {
+ if (!segmentZKMetadata.getStatus().isCompleted()
+ && segmentZKMetadata.getTotalDocs() > 0
+ && segmentZKMetadata.getStartTimeMs() < earliestStartTimeMsOfInProgressSegments) {
+ earliestStartTimeMsOfInProgressSegments = segmentZKMetadata.getStartTimeMs();
+ }
+ }
+ final long finalEarliestStartTimeMsOfInProgressSegments = earliestStartTimeMsOfInProgressSegments;
+ return allSegments.stream()
+ .filter(segmentZKMetadata -> segmentZKMetadata.getStatus().isCompleted()
+ && segmentZKMetadata.getStartTimeMs() < finalEarliestStartTimeMsOfInProgressSegments)
+ .collect(Collectors.toList());
+ } else {
+ return allSegments;
+ }
+ }
+
/**
* Validate table config for merge/rollup task
*/
- private boolean validate(TableConfig tableConfig, String taskType) {
- String offlineTableName = tableConfig.getTableName();
- if (tableConfig.getTableType() != TableType.OFFLINE) {
- LOGGER.warn("Skip generating task: {} for non-OFFLINE table: {}, REALTIME table is not supported yet", taskType,
- offlineTableName);
- return false;
- }
-
+ @VisibleForTesting
+ static boolean validate(TableConfig tableConfig, String taskType) {
+ String tableNameWithType = tableConfig.getTableName();
if (REFRESH.equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig))) {
LOGGER.warn("Skip generating task: {} for non-APPEND table: {}, REFRESH table is not supported", taskType,
- offlineTableName);
+ tableNameWithType);
return false;
}
+ if (tableConfig.getTableType() == TableType.REALTIME) {
+ if (tableConfig.isUpsertEnabled()) {
+ LOGGER.warn("Skip generating task: {} for table: {}, table with upsert enabled is not supported", taskType,
+ tableNameWithType);
+ return false;
+ }
+ if (tableConfig.isDedupEnabled()) {
+ LOGGER.warn("Skip generating task: {} for table: {}, table with dedup enabled is not supported", taskType,
+ tableNameWithType);
+ return false;
+ }
+ }
return true;
}
@@ -527,7 +593,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
* Create pinot task configs with selected segments and configs
*/
private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> selectedSegments,
- String offlineTableName, int maxNumRecordsPerTask, String mergeLevel, List<Integer> partition,
+ String tableNameWithType, int maxNumRecordsPerTask, String mergeLevel, List<Integer> partition,
Map<String, String> mergeConfigs, Map<String, String> taskConfigs) {
int numRecordsPerTask = 0;
List<List<String>> segmentNamesList = new ArrayList<>();
@@ -561,9 +627,9 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
for (int i = 0; i < segmentNamesList.size(); i++) {
String downloadURL = StringUtils.join(downloadURLsList.get(i), MinionConstants.URL_SEPARATOR);
- Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(offlineTableName, taskConfigs,
+ Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(tableNameWithType, taskConfigs,
_clusterInfoAccessor);
- configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
+ configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType);
configs.put(MinionConstants.SEGMENT_NAME_KEY,
StringUtils.join(segmentNamesList.get(i), MinionConstants.SEGMENT_NAME_SEPARATOR));
configs.put(MinionConstants.DOWNLOAD_URL_KEY, downloadURL);
@@ -591,7 +657,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + DELIMITER_IN_SEGMENT_NAME
+ System.currentTimeMillis() + partitionSuffix + DELIMITER_IN_SEGMENT_NAME + i
- + DELIMITER_IN_SEGMENT_NAME + TableNameBuilder.extractRawTableName(offlineTableName));
+ + DELIMITER_IN_SEGMENT_NAME + TableNameBuilder.extractRawTableName(tableNameWithType));
pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs));
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 5830f76e86..c367bda735 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -40,12 +40,16 @@ import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.DedupConfig;
+import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
@@ -57,6 +61,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -67,15 +72,55 @@ import static org.testng.Assert.assertTrue;
public class MergeRollupTaskGeneratorTest {
private static final String RAW_TABLE_NAME = "testTable";
private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE";
+ private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
private static final String DAILY = "daily";
private static final String MONTHLY = "monthly";
- private TableConfig getOfflineTableConfig(Map<String, Map<String, String>> taskConfigsMap) {
- return new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+ private TableConfig getTableConfig(TableType tableType, Map<String, Map<String, String>> taskConfigsMap) {
+ return new TableConfigBuilder(tableType).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
.setTaskConfig(new TableTaskConfig(taskConfigsMap)).build();
}
+ @Test
+ public void testValidateIfMergeRollupCanBeEnabledOrNot() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(RAW_TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN_NAME)
+ .build();
+ assertTrue(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE));
+
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(
+ new BatchIngestionConfig(Collections.emptyList(), "REFRESH", "daily"));
+ tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(RAW_TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN_NAME)
+ .setIngestionConfig(ingestionConfig)
+ .build();
+ assertFalse(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE));
+
+ tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName(RAW_TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN_NAME)
+ .build();
+ assertTrue(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE));
+
+ tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName(RAW_TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN_NAME)
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+ .build();
+ assertFalse(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE));
+
+ tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName(RAW_TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN_NAME)
+ .setDedupConfig(new DedupConfig(true, HashFunction.MD5))
+ .build();
+ assertFalse(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE));
+ }
+
/**
* Tests for some config checks
*/
@@ -84,21 +129,37 @@ public class MergeRollupTaskGeneratorTest {
ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
when(mockClusterInfoProvide.getTaskStates(MinionConstants.MergeRollupTask.TASK_TYPE)).thenReturn(new HashMap<>());
- SegmentZKMetadata metadata1 = getSegmentZKMetadata("testTable__0", 5000, 50_000, TimeUnit.MILLISECONDS, null);
- when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(metadata1));
+ // the two following segments will be skipped when generating tasks
+ SegmentZKMetadata realtimeTableSegmentMetadata1 =
+ getSegmentZKMetadata("testTable__0__0__0", 5000, 50_000, TimeUnit.MILLISECONDS, null);
+ realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+ SegmentZKMetadata realtimeTableSegmentMetadata2 =
+ getSegmentZKMetadata("testTable__1__0__0", 5000, 50_000, TimeUnit.MILLISECONDS, null);
+ when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(realtimeTableSegmentMetadata1, realtimeTableSegmentMetadata2));
+
+ SegmentZKMetadata offlineTableSegmentMetadata =
+ getSegmentZKMetadata("testTable__0", 5000, 50_000, TimeUnit.MILLISECONDS, null);
+ when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(offlineTableSegmentMetadata));
MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
generator.init(mockClusterInfoProvide);
- // Skip task generation, if realtime table
- TableConfig offlineTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build();
- List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+ // Skip task generation, if the table is a realtime table and all segments are skipped
+ // We don't test realtime REFRESH table because this combination does not make sense
+ assertTrue(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.REALTIME,
+ Lists.newArrayList(realtimeTableSegmentMetadata1, realtimeTableSegmentMetadata2)).isEmpty());
+ TableConfig realtimeTableConfig = getTableConfig(TableType.REALTIME, new HashMap<>());
+ List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertTrue(pinotTaskConfigs.isEmpty());
- // Skip task generation, if REFRESH table
+ // Skip task generation, if the table is an offline REFRESH table
+ assertFalse(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.OFFLINE,
+ Lists.newArrayList(offlineTableSegmentMetadata)).isEmpty());
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, "REFRESH", null));
- offlineTableConfig = getOfflineTableConfig(new HashMap<>());
+ TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, new HashMap<>());
offlineTableConfig.setIngestionConfig(ingestionConfig);
pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
assertTrue(pinotTaskConfigs.isEmpty());
@@ -165,7 +226,7 @@ public class MergeRollupTaskGeneratorTest {
tableTaskConfigs.put("daily.bucketTimePeriod", "1d");
tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
- TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+ TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
.thenReturn(Lists.newArrayList(Collections.emptyList()));
@@ -191,7 +252,7 @@ public class MergeRollupTaskGeneratorTest {
tableTaskConfigs.put("daily.bucketTimePeriod", "1d");
tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
- TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+ TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
String segmentName1 = "testTable__1";
@@ -221,7 +282,7 @@ public class MergeRollupTaskGeneratorTest {
tableTaskConfigs.put("daily.bucketTimePeriod", "1d");
tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
- TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+ TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
String segmentName1 = "testTable__1";
@@ -249,7 +310,7 @@ public class MergeRollupTaskGeneratorTest {
tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
tableTaskConfigs.put("daily.maxNumRecordsPerTask", "5000000");
taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
- TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+ TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
String segmentName1 = "testTable__1";
@@ -299,7 +360,7 @@ public class MergeRollupTaskGeneratorTest {
tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
tableTaskConfigs.put("daily.maxNumParallelBuckets", "3");
taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
- TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+ TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
String segmentName1 = "testTable__1";
@@ -504,7 +565,7 @@ public class MergeRollupTaskGeneratorTest {
tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
tableTaskConfigs.put("daily.maxNumRecordsPerTask", "5000000");
taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
- TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+ TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
String segmentName1 = "testTable__1";
String segmentName2 = "testTable__2";
@@ -565,7 +626,7 @@ public class MergeRollupTaskGeneratorTest {
tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
tableTaskConfigs.put("daily.maxNumRecordsPerTask", "5000000");
taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
- TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+ TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
String segmentName1 = "testTable__1";
String segmentName2 = "testTable__2";
@@ -649,7 +710,7 @@ public class MergeRollupTaskGeneratorTest {
tableTaskConfigs.put("monthly.maxNumRecordsPerTask", "5000000");
taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
- TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+ TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
String segmentName1 = "testTable__1";
String segmentName2 = "testTable__2";
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 44fbc7f5c3..a71ddb47f6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -313,6 +313,11 @@ public class TableConfig extends BaseJsonConfig {
_dedupConfig = dedupConfig;
}
+ @JsonIgnore
+ public boolean isDedupEnabled() {
+ return _dedupConfig != null && _dedupConfig.isDedupEnabled();
+ }
+
@Nullable
public DimensionTableConfig getDimensionTableConfig() {
return _dimensionTableConfig;
@@ -347,6 +352,11 @@ public class TableConfig extends BaseJsonConfig {
return _upsertConfig == null ? UpsertConfig.Mode.NONE : _upsertConfig.getMode();
}
+ @JsonIgnore
+ public boolean isUpsertEnabled() {
+ return _upsertConfig != null && _upsertConfig.getMode() != UpsertConfig.Mode.NONE;
+ }
+
@JsonIgnore
@Nullable
public String getUpsertComparisonColumn() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org