You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/26 17:11:52 UTC
[pinot] branch master updated: Segment compaction for upsert real-time tables (#10463)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4d72eb5a58 Segment compaction for upsert real-time tables (#10463)
4d72eb5a58 is described below
commit 4d72eb5a58ed7c9874076d9d488afb1a81eeb54f
Author: Robert Zych <ro...@gmail.com>
AuthorDate: Wed Jul 26 10:11:45 2023 -0700
Segment compaction for upsert real-time tables (#10463)
---
.../pinot/controller/BaseControllerStarter.java | 31 +--
.../helix/core/minion/ClusterInfoAccessor.java | 36 ++-
.../helix/core/minion/PinotTaskManager.java | 11 +-
.../apache/pinot/core/common/MinionConstants.java | 19 ++
...sertCompactionMinionClusterIntegrationTest.java | 228 +++++++++++++++++
.../test/resources/upsert_compaction_test.tar.gz | Bin 0 -> 9405 bytes
.../org/apache/pinot/minion/BaseMinionStarter.java | 17 +-
.../org/apache/pinot/minion/MinionContext.java | 10 +
.../tasks/BaseSingleSegmentConversionExecutor.java | 9 +-
.../UpsertCompactionTaskExecutor.java | 248 ++++++++++++++++++
.../UpsertCompactionTaskExecutorFactory.java | 49 ++++
.../UpsertCompactionTaskGenerator.java | 276 +++++++++++++++++++++
...psertCompactionTaskProgressObserverFactory.java | 33 +++
.../UpsertCompactionTaskExecutorTest.java | 63 +++++
.../UpsertCompactionTaskGeneratorTest.java | 273 ++++++++++++++++++++
.../segment/local/utils/TableConfigUtils.java | 69 ++++--
.../segment/local/utils/TableConfigUtilsTest.java | 136 ++++++----
.../pinot/server/api/resources/TablesResource.java | 96 +++++--
.../pinot/server/api/TablesResourceTest.java | 81 +++---
19 files changed, 1531 insertions(+), 154 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index cc294a5341..e32a419ecf 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -245,19 +245,19 @@ public abstract class BaseControllerStarter implements ServiceStartable {
// NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
// from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
// non-positive value, so set the default value as 1.
- System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _config
- .getProperty(CommonConstants.Helix.CONFIG_OF_CONTROLLER_FLAPPING_TIME_WINDOW_MS,
+ System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
+ _config.getProperty(CommonConstants.Helix.CONFIG_OF_CONTROLLER_FLAPPING_TIME_WINDOW_MS,
CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
}
private void setupHelixClusterConstraints() {
- String maxStateTransitions = _config
- .getProperty(CommonConstants.Helix.CONFIG_OF_HELIX_INSTANCE_MAX_STATE_TRANSITIONS,
+ String maxStateTransitions =
+ _config.getProperty(CommonConstants.Helix.CONFIG_OF_HELIX_INSTANCE_MAX_STATE_TRANSITIONS,
CommonConstants.Helix.DEFAULT_HELIX_INSTANCE_MAX_STATE_TRANSITIONS);
Map<ClusterConstraints.ConstraintAttribute, String> constraintAttributes = new HashMap<>();
constraintAttributes.put(ClusterConstraints.ConstraintAttribute.INSTANCE, ".*");
- constraintAttributes
- .put(ClusterConstraints.ConstraintAttribute.MESSAGE_TYPE, Message.MessageType.STATE_TRANSITION.name());
+ constraintAttributes.put(ClusterConstraints.ConstraintAttribute.MESSAGE_TYPE,
+ Message.MessageType.STATE_TRANSITION.name());
ConstraintItem constraintItem = new ConstraintItem(constraintAttributes, maxStateTransitions);
_helixControllerManager.getClusterManagmentTool()
@@ -371,8 +371,8 @@ public abstract class BaseControllerStarter implements ServiceStartable {
private void setUpPinotController() {
// install default SSL context if necessary (even if not force-enabled everywhere)
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, ControllerConf.CONTROLLER_TLS_PREFIX);
- if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils
- .isNotBlank(tlsDefaults.getTrustStorePath())) {
+ if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank(
+ tlsDefaults.getTrustStorePath())) {
LOGGER.info("Installing default SSL context for any client requests");
TlsUtils.installDefaultSSLSocketFactory(tlsDefaults);
}
@@ -392,8 +392,9 @@ public abstract class BaseControllerStarter implements ServiceStartable {
_config.getProperty(CommonConstants.Controller.CONFIG_OF_CONTROLLER_QUERY_REWRITER_CLASS_NAMES));
LOGGER.info("Initializing Helix participant manager");
- _helixParticipantManager = HelixManagerFactory
- .getZKHelixManager(_helixClusterName, _helixParticipantInstanceId, InstanceType.PARTICIPANT, _helixZkURL);
+ _helixParticipantManager =
+ HelixManagerFactory.getZKHelixManager(_helixClusterName, _helixParticipantInstanceId, InstanceType.PARTICIPANT,
+ _helixZkURL);
// LeadControllerManager needs to be initialized before registering as Helix participant.
LOGGER.info("Initializing lead controller manager");
@@ -502,8 +503,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
_adminApp.start(_listenerConfigs);
- _controllerMetrics.addCallbackGauge("dataDir.exists",
- () -> new File(_config.getDataDir()).exists() ? 1L : 0L);
+ _controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
File dataDir = new File(_config.getDataDir());
if (dataDir.exists()) {
@@ -673,7 +673,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
_taskManagerStatusCache = getTaskManagerStatusCache();
_taskManager =
new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config,
- _controllerMetrics, _taskManagerStatusCache);
+ _controllerMetrics, _taskManagerStatusCache, _executorService, _connectionManager);
periodicTasks.add(_taskManager);
_retentionManager =
new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
@@ -693,8 +693,9 @@ public abstract class BaseControllerStarter implements ServiceStartable {
new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
_executorService);
periodicTasks.add(_segmentStatusChecker);
- _realtimeConsumerMonitor = new RealtimeConsumerMonitor(_config, _helixResourceManager, _leadControllerManager,
- _controllerMetrics, _executorService);
+ _realtimeConsumerMonitor =
+ new RealtimeConsumerMonitor(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics,
+ _executorService);
periodicTasks.add(_realtimeConsumerMonitor);
_segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
_executorService, _connectionManager);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index bcb1491988..5551c04d7c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -21,7 +21,9 @@ package org.apache.pinot.controller.helix.core.minion;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
import javax.annotation.Nullable;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.task.TaskState;
@@ -51,15 +53,20 @@ public class ClusterInfoAccessor {
private final ControllerConf _controllerConf;
private final ControllerMetrics _controllerMetrics;
private final LeadControllerManager _leadControllerManager;
+ private final Executor _executor;
+ private final MultiThreadedHttpConnectionManager _connectionManager;
public ClusterInfoAccessor(PinotHelixResourceManager pinotHelixResourceManager,
PinotHelixTaskResourceManager pinotHelixTaskResourceManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager) {
+ ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager, Executor executor,
+ MultiThreadedHttpConnectionManager connectionManager) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_pinotHelixTaskResourceManager = pinotHelixTaskResourceManager;
_controllerConf = controllerConf;
_controllerMetrics = controllerMetrics;
_leadControllerManager = leadControllerManager;
+ _executor = executor;
+ _connectionManager = connectionManager;
}
/**
@@ -94,6 +101,20 @@ public class ClusterInfoAccessor {
return ZKMetadataProvider.getSegmentsZKMetadata(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
}
+ /**
+ * Get shared executor
+ */
+ public Executor getExecutor() {
+ return _executor;
+ }
+
+ /**
+ * Get shared connection manager
+ */
+ public MultiThreadedHttpConnectionManager getConnectionManager() {
+ return _connectionManager;
+ }
+
/**
* Fetches the ZNRecord under MINION_TASK_METADATA/${tableNameWithType}/${taskType} for the given
* taskType and tableNameWithType
@@ -114,8 +135,8 @@ public class ClusterInfoAccessor {
*/
@Nullable
public SegmentLineage getSegmentLineage(String tableNameWithType) {
- return SegmentLineageAccessHelper
- .getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+ return SegmentLineageAccessHelper.getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
+ tableNameWithType);
}
/**
@@ -127,8 +148,8 @@ public class ClusterInfoAccessor {
* @param expectedVersion The expected version of data to be overwritten. Set to -1 to override version check.
*/
public void setMinionTaskMetadata(BaseTaskMetadata taskMetadata, String taskType, int expectedVersion) {
- MinionTaskMetadataUtils
- .persistTaskMetadata(_pinotHelixResourceManager.getPropertyStore(), taskType, taskMetadata, expectedVersion);
+ MinionTaskMetadataUtils.persistTaskMetadata(_pinotHelixResourceManager.getPropertyStore(), taskType, taskMetadata,
+ expectedVersion);
}
/**
@@ -175,8 +196,9 @@ public class ClusterInfoAccessor {
* @return cluster config
*/
public String getClusterConfig(String configName) {
- HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
- .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build();
+ HelixConfigScope helixConfigScope =
+ new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+ _pinotHelixResourceManager.getHelixClusterName()).build();
Map<String, String> configMap =
_pinotHelixResourceManager.getHelixAdmin().getConfig(helixConfigScope, Collections.singletonList(configName));
return configMap != null ? configMap.get(configName) : null;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index e7a7584b76..4f10b6a5c5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -32,8 +32,10 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.helix.AccessOption;
import org.apache.helix.task.TaskState;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
@@ -114,7 +116,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
ControllerConf controllerConf, ControllerMetrics controllerMetrics,
- TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache) {
+ TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache, Executor executor,
+ MultiThreadedHttpConnectionManager connectionManager) {
super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager,
controllerMetrics);
@@ -122,7 +125,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
_taskManagerStatusCache = taskManagerStatusCache;
_clusterInfoAccessor =
new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf, controllerMetrics,
- leadControllerManager);
+ leadControllerManager, executor, connectionManager);
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
_skipLateCronSchedule = controllerConf.isSkipLateCronSchedule();
_maxCronScheduleDelayInSeconds = controllerConf.getMaxCronScheduleDelayInSeconds();
@@ -561,8 +564,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
long successRunTimestamp = System.currentTimeMillis();
for (TableConfig tableConfig : enabledTableConfigs) {
_taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(), taskGenerator.getTaskType(),
- taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(
- successRunTimestamp, errors.toString()));
+ taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(successRunTimestamp,
+ errors.toString()));
// before the first task schedule, the follow gauge metric will be empty
// TODO: find a better way to report task generation information
_controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), taskGenerator.getTaskType(),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 8e4c5cfc1c..25c5427cf6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -139,4 +139,23 @@ public class MinionConstants {
public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
"SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
}
+
+ public static class UpsertCompactionTask {
+ public static final String TASK_TYPE = "UpsertCompactionTask";
+ /**
+ * The time period to wait before picking segments for this task
+ * e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
+ */
+ public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+ /**
+ * The maximum percent of old records allowed for a completed segment.
+ * e.g. if the percent surpasses 30, then the segment may be compacted
+ */
+ public static final String INVALID_RECORDS_THRESHOLD_PERCENT = "invalidRecordsThresholdPercent";
+ /**
+ * The maximum count of old records for a completed segment
+ * e.g. if the count surpasses 100k, then the segment may be compacted
+ */
+ public static final String INVALID_RECORDS_THRESHOLD_COUNT = "invalidRecordsThresholdCount";
+ }
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java
new file mode 100644
index 0000000000..de645ca749
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+
+public class UpsertCompactionMinionClusterIntegrationTest extends BaseClusterIntegrationTest {
+ protected PinotHelixTaskResourceManager _helixTaskResourceManager;
+ protected PinotTaskManager _taskManager;
+ private static final String PRIMARY_KEY_COL = "clientId";
+ private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME);
+ private static List<File> _avroFiles;
+ private TableConfig _tableConfig;
+ private Schema _schema;
+
+ @Override
+ protected String getSchemaFileName() {
+ return "upsert_upload_segment_test.schema";
+ }
+
+ @Override
+ protected String getSchemaName() {
+ return "upsertSchema";
+ }
+
+ @Override
+ protected String getAvroTarFileName() {
+ return "upsert_compaction_test.tar.gz";
+ }
+
+ @Override
+ protected String getPartitionColumn() {
+ return PRIMARY_KEY_COL;
+ }
+
+ private TableTaskConfig getCompactionTaskConfig() {
+ Map<String, String> tableTaskConfigs = new HashMap<>();
+ tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, "0d");
+ tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, "1");
+ tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT, "10");
+ return new TableTaskConfig(
+ Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE, tableTaskConfigs));
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServers(1);
+
+ // Unpack the Avro files
+ _avroFiles = unpackAvroData(_tempDir);
+
+ startKafka();
+
+ // Create and upload the schema and table config
+ _schema = createSchema();
+ addSchema(_schema);
+ _tableConfig = createUpsertTableConfig(_avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
+ _tableConfig.setTaskConfig(getCompactionTaskConfig());
+ addTableConfig(_tableConfig);
+
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles, _tableConfig, _schema, 0, _segmentDir, _tarDir);
+
+ startMinion();
+ _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
+ _taskManager = _controllerStarter.getTaskManager();
+ }
+
+ @BeforeMethod
+ public void beforeMethod()
+ throws Exception {
+ // Create and upload segments
+ uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
+ }
+
+ protected void waitForAllDocsLoaded(long timeoutMs, long expectedCount)
+ throws Exception {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return getCurrentCountStarResultWithoutUpsert() == expectedCount;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, timeoutMs, "Failed to load all documents");
+ assertEquals(getCurrentCountStarResult(), getCountStarResult());
+ }
+
+ private long getCurrentCountStarResultWithoutUpsert() {
+ return getPinotConnection().execute("SELECT COUNT(*) FROM " + getTableName() + " OPTION(skipUpsert=true)")
+ .getResultSet(0).getLong(0);
+ }
+
+ private long getSalary() {
+ return getPinotConnection().execute("SELECT salary FROM " + getTableName() + " WHERE clientId=100001")
+ .getResultSet(0).getLong(0);
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ return 3;
+ }
+
+ @AfterMethod
+ public void afterMethod()
+ throws Exception {
+ String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+
+ // Test dropping all segments one by one
+ List<String> segments = listSegments(realtimeTableName);
+ assertFalse(segments.isEmpty());
+ for (String segment : segments) {
+ dropSegment(realtimeTableName, segment);
+ }
+ // NOTE: There is a delay to remove the segment from property store
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ return listSegments(realtimeTableName).isEmpty();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 60_000L, "Failed to drop the segments");
+
+ stopServer();
+ startServers(1);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+ dropRealtimeTable(realtimeTableName);
+ stopMinion();
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+
+ @Test
+ public void testCompaction()
+ throws Exception {
+ waitForAllDocsLoaded(600_000L, 283);
+ assertEquals(getSalary(), 9747108);
+
+ assertNotNull(_taskManager.scheduleTasks(REALTIME_TABLE_NAME).get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+ waitForTaskToComplete();
+ waitForAllDocsLoaded(600_000L, 3);
+ assertEquals(getSalary(), 9747108);
+ }
+
+ @Test
+ public void testCompactionDeletesSegments()
+ throws Exception {
+ pushAvroIntoKafka(_avroFiles);
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L, 566);
+ assertEquals(getSalary(), 9747108);
+
+ assertNull(_taskManager.scheduleTasks(REALTIME_TABLE_NAME).get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+ waitForTaskToComplete();
+ waitForAllDocsLoaded(600_000L, 283);
+ assertEquals(getSalary(), 9747108);
+ }
+
+ protected void waitForTaskToComplete() {
+ TestUtils.waitForCondition(input -> {
+ // Check task state
+ for (TaskState taskState : _helixTaskResourceManager.getTaskStates(MinionConstants.UpsertCompactionTask.TASK_TYPE)
+ .values()) {
+ if (taskState != TaskState.COMPLETED) {
+ return false;
+ }
+ }
+ return true;
+ }, 600_000L, "Failed to complete task");
+ }
+}
diff --git a/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz b/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz
new file mode 100644
index 0000000000..76b0a50126
Binary files /dev/null and b/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz differ
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
index 845ec95e66..3d3bfcbe0b 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
@@ -124,8 +124,8 @@ public abstract class BaseMinionStarter implements ServiceStartable {
// NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
// from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
// non-positive value, so set the default value as 1.
- System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _config
- .getProperty(CommonConstants.Helix.CONFIG_OF_MINION_FLAPPING_TIME_WINDOW_MS,
+ System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
+ _config.getProperty(CommonConstants.Helix.CONFIG_OF_MINION_FLAPPING_TIME_WINDOW_MS,
CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
}
@@ -174,8 +174,8 @@ public abstract class BaseMinionStarter implements ServiceStartable {
// Initialize data directory
LOGGER.info("Initializing data directory");
- File dataDir = new File(_config
- .getProperty(CommonConstants.Helix.Instance.DATA_DIR_KEY, CommonConstants.Minion.DEFAULT_INSTANCE_DATA_DIR));
+ File dataDir = new File(_config.getProperty(CommonConstants.Helix.Instance.DATA_DIR_KEY,
+ CommonConstants.Minion.DEFAULT_INSTANCE_DATA_DIR));
if (dataDir.exists()) {
FileUtils.cleanDirectory(dataDir);
} else {
@@ -195,8 +195,8 @@ public abstract class BaseMinionStarter implements ServiceStartable {
// Install default SSL context if necessary (even if not force-enabled everywhere)
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, CommonConstants.Minion.MINION_TLS_PREFIX);
- if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils
- .isNotBlank(tlsDefaults.getTrustStorePath())) {
+ if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank(
+ tlsDefaults.getTrustStorePath())) {
LOGGER.info("Installing default SSL context for any client requests");
TlsUtils.installDefaultSSLSocketFactory(tlsDefaults);
}
@@ -237,8 +237,7 @@ public abstract class BaseMinionStarter implements ServiceStartable {
PinotConfiguration segmentUploaderConfig =
_config.subset(CommonConstants.Minion.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER);
if (segmentUploaderConfig.isEmpty()) {
- segmentUploaderConfig =
- _config.subset(CommonConstants.Minion.DEPRECATED_PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER);
+ segmentUploaderConfig = _config.subset(CommonConstants.Minion.DEPRECATED_PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER);
}
PinotConfiguration httpsConfig = segmentUploaderConfig.subset(CommonConstants.HTTPS_PROTOCOL);
if (httpsConfig.getProperty(HTTPS_ENABLED, false)) {
@@ -254,7 +253,7 @@ public abstract class BaseMinionStarter implements ServiceStartable {
_helixManager.connect();
updateInstanceConfigIfNeeded();
minionContext.setHelixPropertyStore(_helixManager.getHelixPropertyStore());
-
+ minionContext.setHelixManager(_helixManager);
LOGGER.info("Starting minion admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
_minionAdminApplication = new MinionAdminApiApplication(_instanceId, _config);
_minionAdminApplication.start(_listenerConfigs);
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
index 6ccd364677..8b6e0dc4c9 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
@@ -20,6 +20,7 @@ package org.apache.pinot.minion;
import java.io.File;
import javax.net.ssl.SSLContext;
+import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metrics.MinionMetrics;
@@ -43,6 +44,7 @@ public class MinionContext {
private File _dataDir;
private MinionMetrics _minionMetrics;
private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
+ private HelixManager _helixManager;
// For segment upload
private SSLContext _sslContext;
@@ -107,4 +109,12 @@ public class MinionContext {
public void setTaskAuthProvider(AuthProvider taskAuthProvider) {
_taskAuthProvider = taskAuthProvider;
}
+
+ public void setHelixManager(HelixManager helixManager) {
+ _helixManager = helixManager;
+ }
+
+ public HelixManager getHelixManager() {
+ return _helixManager;
+ }
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index 51c7f98543..001ce26d46 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -131,9 +131,12 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
"Converted segment name: %s does not match original segment name: %s",
segmentConversionResult.getSegmentName(), segmentName);
+ File convertedSegmentDir = segmentConversionResult.getFile();
+ if (convertedSegmentDir == null) {
+ return segmentConversionResult;
+ }
// Tar the converted segment
_eventObserver.notifyProgress(_pinotTaskConfig, "Compressing segment: " + segmentName);
- File convertedSegmentDir = segmentConversionResult.getFile();
File convertedTarredSegmentFile =
new File(tempDataDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
TarGzCompressionUtils.createTarGzFile(convertedSegmentDir, convertedTarredSegmentFile);
@@ -184,8 +187,8 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
TableNameBuilder.extractRawTableName(tableNameWithType));
NameValuePair tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString());
- List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter,
- tableTypeParameter);
+ List<NameValuePair> parameters =
+ Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter, tableTypeParameter);
// Upload the tarred segment
_eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " + segmentName);
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
new file mode 100644
index 0000000000..aa37ac871a
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExecutor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class);
+ private static HelixManager _helixManager = MINION_CONTEXT.getHelixManager();
+ private static HelixAdmin _clusterManagementTool = _helixManager.getClusterManagmentTool();
+ private static String _clusterName = _helixManager.getClusterName();
+
+ private class CompactedRecordReader implements RecordReader {
+ private final PinotSegmentRecordReader _pinotSegmentRecordReader;
+ private final PeekableIntIterator _validDocIdsIterator;
+ // Reusable generic row to store the next row to return
+ GenericRow _nextRow = new GenericRow();
+ // Flag to mark whether we need to fetch another row
+ boolean _nextRowReturned = true;
+
+ CompactedRecordReader(File indexDir, ImmutableRoaringBitmap validDocIds) {
+ _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+ _pinotSegmentRecordReader.init(indexDir, null, null);
+ _validDocIdsIterator = validDocIds.getIntIterator();
+ }
+
+ @Override
+ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) {
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!_validDocIdsIterator.hasNext() && _nextRowReturned) {
+ return false;
+ }
+
+ // If next row has not been returned, return true
+ if (!_nextRowReturned) {
+ return true;
+ }
+
+ // Try to get the next row to return
+ if (_validDocIdsIterator.hasNext()) {
+ int docId = _validDocIdsIterator.next();
+ _nextRow.clear();
+ _pinotSegmentRecordReader.getRecord(docId, _nextRow);
+ _nextRowReturned = false;
+ return true;
+ }
+
+ // Cannot find next row to return, return false
+ return false;
+ }
+
+ @Override
+ public GenericRow next() {
+ return next(new GenericRow());
+ }
+
+ @Override
+ public GenericRow next(GenericRow reuse) {
+ Preconditions.checkState(!_nextRowReturned);
+ reuse.init(_nextRow);
+ _nextRowReturned = true;
+ return reuse;
+ }
+
+ @Override
+ public void rewind() {
+ _pinotSegmentRecordReader.rewind();
+ _nextRowReturned = true;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _pinotSegmentRecordReader.close();
+ }
+ }
+
+ @Override
+ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir)
+ throws Exception {
+ _eventObserver.notifyProgress(pinotTaskConfig, "Compacting segment: " + indexDir);
+ Map<String, String> configs = pinotTaskConfig.getConfigs();
+ String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY);
+ String taskType = pinotTaskConfig.getTaskType();
+ LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+ long startMillis = System.currentTimeMillis();
+
+ String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+ TableConfig tableConfig = getTableConfig(tableNameWithType);
+ ImmutableRoaringBitmap validDocIds = getValidDocIds(tableNameWithType, configs);
+
+ if (validDocIds.isEmpty()) {
+ // prevents empty segment generation
+ LOGGER.info("validDocIds is empty, skip the task. Table: {}, segment: {}", tableNameWithType, segmentName);
+ if (indexDir.exists() && !FileUtils.deleteQuietly(indexDir)) {
+ LOGGER.warn("Failed to delete input segment: {}", indexDir.getAbsolutePath());
+ }
+ if (!FileUtils.deleteQuietly(workingDir)) {
+ LOGGER.warn("Failed to delete working directory: {}", workingDir.getAbsolutePath());
+ }
+ return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+ .build();
+ }
+
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+ try (CompactedRecordReader compactedRecordReader = new CompactedRecordReader(indexDir, validDocIds)) {
+ SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, tableConfig, segmentMetadata, segmentName);
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(config, compactedRecordReader);
+ driver.build();
+ }
+
+ File compactedSegmentFile = new File(workingDir, segmentName);
+ SegmentConversionResult result =
+ new SegmentConversionResult.Builder().setFile(compactedSegmentFile).setTableNameWithType(tableNameWithType)
+ .setSegmentName(segmentName).build();
+
+ long endMillis = System.currentTimeMillis();
+ LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
+
+ return result;
+ }
+
+ private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig,
+ SegmentMetadataImpl segmentMetadata, String segmentName) {
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, segmentMetadata.getSchema());
+ config.setOutDir(workingDir.getPath());
+ config.setSegmentName(segmentName);
+ // Keep index creation time the same as original segment because both segments use the same raw data.
+ // This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to
+ // identify if the new pushed segment has newer data than the existing one.
+ config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));
+
+ // The time column type info is not stored in the segment metadata.
+ // Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT).
+ if (segmentMetadata.getTimeInterval() != null) {
+ config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName());
+ config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
+ config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
+ config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
+ }
+ return config;
+ }
+
+ // TODO: Consider moving this method to a more appropriate class (eg ServerSegmentMetadataReader)
+ private static ImmutableRoaringBitmap getValidDocIds(String tableNameWithType, Map<String, String> configs)
+ throws URISyntaxException {
+ String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY);
+ String server = getServer(segmentName, tableNameWithType);
+
+ // get the url for the validDocIds for the server
+ InstanceConfig instanceConfig = _clusterManagementTool.getInstanceConfig(_clusterName, server);
+ String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
+ String url =
+ new URIBuilder(endpoint).setPath(String.format("/segments/%s/%s/validDocIds", tableNameWithType, segmentName))
+ .toString();
+
+ // get the validDocIds from that server
+ Response response = ClientBuilder.newClient().target(url).request().get(Response.class);
+ Preconditions.checkState(response.getStatus() == Response.Status.OK.getStatusCode(),
+ "Unable to retrieve validDocIds from %s", url);
+ byte[] snapshot = response.readEntity(byte[].class);
+ ImmutableRoaringBitmap validDocIds = new ImmutableRoaringBitmap(ByteBuffer.wrap(snapshot));
+ return validDocIds;
+ }
+
+ @VisibleForTesting
+ public static String getServer(String segmentName, String tableNameWithType) {
+ ExternalView externalView = _clusterManagementTool.getResourceExternalView(_clusterName, tableNameWithType);
+ if (externalView == null) {
+ throw new IllegalStateException("External view does not exist for table: " + tableNameWithType);
+ }
+ Map<String, String> instanceStateMap = externalView.getStateMap(segmentName);
+ if (instanceStateMap == null) {
+ throw new IllegalStateException("Failed to find segment: " + segmentName);
+ }
+ for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
+ if (entry.getValue().equals(SegmentStateModel.ONLINE)) {
+ return entry.getKey();
+ }
+ }
+ throw new IllegalStateException("Failed to find ONLINE server for segment: " + segmentName);
+ }
+
+ @Override
+ protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
+ SegmentConversionResult segmentConversionResult) {
+ return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
+ Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
+ String.valueOf(System.currentTimeMillis())));
+ }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorFactory.java
new file mode 100644
index 0000000000..8989892f77
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+import org.apache.pinot.minion.executor.PinotTaskExecutor;
+import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
+public class UpsertCompactionTaskExecutorFactory implements PinotTaskExecutorFactory {
+
+ @Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+ }
+
+ @Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+ }
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.UpsertCompactionTask.TASK_TYPE;
+ }
+
+ @Override
+ public PinotTaskExecutor create() {
+ return new UpsertCompactionTaskExecutor();
+ }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
new file mode 100644
index 0000000000..aa84ffefa5
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class);
+ private static final String DEFAULT_BUFFER_PERIOD = "7d";
+ private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0;
+ private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 0;
+
+ public static class SegmentSelectionResult {
+
+ private List<SegmentZKMetadata> _segmentsForCompaction;
+
+ private List<String> _segmentsForDeletion;
+
+ SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, List<String> segmentsForDeletion) {
+ _segmentsForCompaction = segmentsForCompaction;
+ _segmentsForDeletion = segmentsForDeletion;
+ }
+
+ public List<SegmentZKMetadata> getSegmentsForCompaction() {
+ return _segmentsForCompaction;
+ }
+
+ public List<String> getSegmentsForDeletion() {
+ return _segmentsForDeletion;
+ }
+ }
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.UpsertCompactionTask.TASK_TYPE;
+ }
+
+ @Override
+ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+ String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
+ List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+ for (TableConfig tableConfig : tableConfigs) {
+ if (!validate(tableConfig)) {
+ continue;
+ }
+
+ String tableNameWithType = tableConfig.getTableName();
+ LOGGER.info("Start generating task configs for table: {}", tableNameWithType);
+
+ Map<String, String> taskConfigs = tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
+ List<SegmentZKMetadata> completedSegments = getCompletedSegments(tableNameWithType, taskConfigs);
+
+ if (completedSegments.isEmpty()) {
+ LOGGER.info("No completed segments were eligible for compaction for table: {}", tableNameWithType);
+ continue;
+ }
+
+ // get server to segment mappings
+ PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager();
+ Map<String, List<String>> serverToSegments = pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+ BiMap<String, String> serverToEndpoints;
+ try {
+ serverToEndpoints = pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ } catch (InvalidConfigException e) {
+ throw new RuntimeException(e);
+ }
+
+ Map<String, SegmentZKMetadata> completedSegmentsMap =
+ completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity()));
+
+ List<String> validDocIdUrls;
+ try {
+ validDocIdUrls = getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints, tableNameWithType,
+ completedSegmentsMap.keySet());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ // request the urls from the servers
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_clusterInfoAccessor.getExecutor(), _clusterInfoAccessor.getConnectionManager(),
+ serverToEndpoints.inverse());
+
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(validDocIdUrls, tableNameWithType, false, 3000);
+
+ SegmentSelectionResult segmentSelectionResult =
+ processValidDocIdMetadata(taskConfigs, completedSegmentsMap, serviceResponse._httpResponses.entrySet());
+
+ if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
+ pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentSelectionResult.getSegmentsForDeletion(),
+ "0d");
+ LOGGER.info("Deleted segments containing only invalid records for table: {}", tableNameWithType);
+ }
+
+ int numTasks = 0;
+ int maxTasks = getMaxTasks(taskType, tableNameWithType, taskConfigs);
+ for (SegmentZKMetadata segment : segmentSelectionResult.getSegmentsForCompaction()) {
+ if (numTasks == maxTasks) {
+ break;
+ }
+ Map<String, String> configs = new HashMap<>();
+ configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType);
+ configs.put(MinionConstants.SEGMENT_NAME_KEY, segment.getSegmentName());
+ configs.put(MinionConstants.DOWNLOAD_URL_KEY, segment.getDownloadUrl());
+ configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
+ configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segment.getCrc()));
+ pinotTaskConfigs.add(new PinotTaskConfig(UpsertCompactionTask.TASK_TYPE, configs));
+ numTasks++;
+ }
+ LOGGER.info("Finished generating {} tasks configs for table: {}", numTasks, tableNameWithType);
+ }
+ return pinotTaskConfigs;
+ }
+
+ @VisibleForTesting
+ public static SegmentSelectionResult processValidDocIdMetadata(Map<String, String> taskConfigs,
+ Map<String, SegmentZKMetadata> completedSegmentsMap, Set<Map.Entry<String, String>> responseSet) {
+ double invalidRecordsThresholdPercent = Double.parseDouble(
+ taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
+ String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
+ long invalidRecordsThresholdCount = Long.parseLong(
+ taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
+ String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
+ List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>();
+ List<String> segmentsForDeletion = new ArrayList<>();
+ for (Map.Entry<String, String> streamResponse : responseSet) {
+ JsonNode allValidDocIdMetadata;
+ try {
+ allValidDocIdMetadata = JsonUtils.stringToJsonNode(streamResponse.getValue());
+ } catch (IOException e) {
+ LOGGER.error("Unable to parse validDocIdMetadata response for: {}", streamResponse.getKey());
+ continue;
+ }
+ Iterator<JsonNode> iterator = allValidDocIdMetadata.elements();
+ while (iterator.hasNext()) {
+ JsonNode validDocIdMetadata = iterator.next();
+ long totalInvalidDocs = validDocIdMetadata.get("totalInvalidDocs").asLong();
+ String segmentName = validDocIdMetadata.get("segmentName").asText();
+ SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
+ long totalDocs = validDocIdMetadata.get("totalDocs").asLong();
+ double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100;
+ if (totalInvalidDocs == totalDocs) {
+ segmentsForDeletion.add(segment.getSegmentName());
+ } else if (invalidRecordPercent > invalidRecordsThresholdPercent
+ && totalInvalidDocs > invalidRecordsThresholdCount) {
+ segmentsForCompaction.add(segment);
+ }
+ }
+ }
+ return new SegmentSelectionResult(segmentsForCompaction, segmentsForDeletion);
+ }
+
+ @VisibleForTesting
+ public static List<String> getValidDocIdMetadataUrls(Map<String, List<String>> serverToSegments,
+ BiMap<String, String> serverToEndpoints, String tableNameWithType, Set<String> completedSegments)
+ throws URISyntaxException {
+ Set<String> remainingSegments = new HashSet<>(completedSegments);
+ List<String> urls = new ArrayList<>();
+ for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
+ if (remainingSegments.isEmpty()) {
+ break;
+ }
+ String server = entry.getKey();
+ List<String> segmentNames = entry.getValue();
+ URIBuilder uriBuilder = new URIBuilder(serverToEndpoints.get(server)).setPath(
+ String.format("/tables/%s/validDocIdMetadata", tableNameWithType));
+ int completedSegmentCountPerServer = 0;
+ for (String segmentName : segmentNames) {
+ if (remainingSegments.remove(segmentName)) {
+ completedSegmentCountPerServer++;
+ uriBuilder.addParameter("segmentNames", segmentName);
+ }
+ }
+ if (completedSegmentCountPerServer > 0) {
+ // only add to the list if the server has completed segments
+ urls.add(uriBuilder.toString());
+ }
+ }
+ return urls;
+ }
+
+ private List<SegmentZKMetadata> getCompletedSegments(String tableNameWithType, Map<String, String> taskConfigs) {
+ List<SegmentZKMetadata> completedSegments = new ArrayList<>();
+ String bufferPeriod = taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
+ long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+ List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+ for (SegmentZKMetadata segment : allSegments) {
+ CommonConstants.Segment.Realtime.Status status = segment.getStatus();
+ // initial segments selection based on status and age
+ if (status.isCompleted() && (segment.getEndTimeMs() <= (System.currentTimeMillis() - bufferMs))) {
+ completedSegments.add(segment);
+ }
+ }
+ return completedSegments;
+ }
+
+ @VisibleForTesting
+ public static int getMaxTasks(String taskType, String tableNameWithType, Map<String, String> taskConfigs) {
+ int maxTasks = Integer.MAX_VALUE;
+ String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+ if (tableMaxNumTasksConfig != null) {
+ try {
+ maxTasks = Integer.parseInt(tableMaxNumTasksConfig);
+ } catch (Exception e) {
+ LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType);
+ }
+ }
+ return maxTasks;
+ }
+
+ @VisibleForTesting
+ static boolean validate(TableConfig tableConfig) {
+ String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
+ String tableNameWithType = tableConfig.getTableName();
+ if (tableConfig.getTableType() == TableType.OFFLINE) {
+ LOGGER.warn("Skip generation task: {} for table: {}, offline table is not supported", taskType,
+ tableNameWithType);
+ return false;
+ }
+ if (!tableConfig.isUpsertEnabled()) {
+ LOGGER.warn("Skip generation task: {} for table: {}, table without upsert enabled is not supported", taskType,
+ tableNameWithType);
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskProgressObserverFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskProgressObserverFactory.java
new file mode 100644
index 0000000000..591a21b8ff
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskProgressObserverFactory.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory;
+import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
+
+
+@EventObserverFactory
+public class UpsertCompactionTaskProgressObserverFactory extends BaseMinionProgressObserverFactory {
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.UpsertCompactionTask.TASK_TYPE;
+ }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
new file mode 100644
index 0000000000..604c58f6d0
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class UpsertCompactionTaskExecutorTest {
+ private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+ private static final String SEGMENT_NAME = "testSegment";
+ private static final String CLUSTER_NAME = "testCluster";
+
+ @Test
+ public void testGetServer() {
+ ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
+ Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields();
+ Map<String, String> map = new HashMap<>();
+ map.put("server1", SegmentStateModel.ONLINE);
+ externalViewSegmentAssignment.put(SEGMENT_NAME, map);
+ HelixAdmin clusterManagementTool = Mockito.mock(HelixAdmin.class);
+ MinionContext minionContext = MinionContext.getInstance();
+ Mockito.when(clusterManagementTool.getResourceExternalView(CLUSTER_NAME, REALTIME_TABLE_NAME))
+ .thenReturn(externalView);
+ HelixManager helixManager = Mockito.mock(HelixManager.class);
+ Mockito.when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
+ Mockito.when(helixManager.getClusterManagmentTool()).thenReturn(clusterManagementTool);
+ minionContext.setHelixManager(helixManager);
+
+ String server = UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME);
+
+ Assert.assertEquals(server, "server1");
+
+ // verify exception thrown with OFFLINE server
+ map.put("server1", SegmentStateModel.OFFLINE);
+ Assert.assertThrows(IllegalStateException.class,
+ () -> UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME));
+ }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
new file mode 100644
index 0000000000..9c93979d9f
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import java.net.URISyntaxException;
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class UpsertCompactionTaskGeneratorTest {
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+ private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+ private UpsertCompactionTaskGenerator _taskGenerator;
+ private TableConfig _tableConfig;
+ private ClusterInfoAccessor _mockClusterInfoAccessor;
+ private SegmentZKMetadata _completedSegment;
+ private SegmentZKMetadata _completedSegment2;
+ private Map<String, SegmentZKMetadata> _completedSegmentsMap;
+
+ @BeforeClass
+ public void setUp() {
+ _taskGenerator = new UpsertCompactionTaskGenerator();
+ Map<String, Map<String, String>> tableTaskConfigs = new HashMap<>();
+ Map<String, String> compactionConfigs = new HashMap<>();
+ tableTaskConfigs.put(UpsertCompactionTask.TASK_TYPE, compactionConfigs);
+ _tableConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+ .setTaskConfig(new TableTaskConfig(tableTaskConfigs)).build();
+ _mockClusterInfoAccessor = mock(ClusterInfoAccessor.class);
+
+ _completedSegment = new SegmentZKMetadata("testTable__0");
+ _completedSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ _completedSegment.setStartTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("2d"));
+ _completedSegment.setEndTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("1d"));
+ _completedSegment.setTimeUnit(TimeUnit.MILLISECONDS);
+ _completedSegment.setTotalDocs(100L);
+
+ _completedSegment2 = new SegmentZKMetadata("testTable__1");
+ _completedSegment2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ _completedSegment2.setStartTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("1d"));
+ _completedSegment2.setEndTime(System.currentTimeMillis());
+ _completedSegment2.setTimeUnit(TimeUnit.MILLISECONDS);
+ _completedSegment2.setTotalDocs(10L);
+
+ _completedSegmentsMap = new HashMap<>();
+ _completedSegmentsMap.put(_completedSegment.getSegmentName(), _completedSegment);
+ _completedSegmentsMap.put(_completedSegment2.getSegmentName(), _completedSegment2);
+ }
+
+ @Test
+ public void testValidate() {
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+ .build();
+ assertFalse(UpsertCompactionTaskGenerator.validate(tableConfig));
+
+ TableConfigBuilder tableConfigBuilder =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME);
+ assertFalse(UpsertCompactionTaskGenerator.validate(tableConfigBuilder.build()));
+
+ tableConfigBuilder = tableConfigBuilder.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL));
+ assertTrue(UpsertCompactionTaskGenerator.validate(tableConfigBuilder.build()));
+ }
+
+ @Test
+ public void testGenerateTasksValidatesTableConfigs() {
+ UpsertCompactionTaskGenerator taskGenerator = new UpsertCompactionTaskGenerator();
+ TableConfig offlineTableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+ .build();
+ List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(Lists.newArrayList(offlineTableConfig));
+ assertTrue(pinotTaskConfigs.isEmpty());
+
+ TableConfig realtimeTableConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+ .build();
+ pinotTaskConfigs = taskGenerator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertTrue(pinotTaskConfigs.isEmpty());
+ }
+
+ @Test
+ public void testGenerateTasksWithNoSegments() {
+ when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
+ Lists.newArrayList(Collections.emptyList()));
+ _taskGenerator.init(_mockClusterInfoAccessor);
+
+ List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig));
+
+ assertEquals(pinotTaskConfigs.size(), 0);
+ }
+
+ @Test
+ public void testGenerateTasksWithConsumingSegment() {
+ SegmentZKMetadata consumingSegment = new SegmentZKMetadata("testTable__0");
+ consumingSegment.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+ when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
+ Lists.newArrayList(consumingSegment));
+ _taskGenerator.init(_mockClusterInfoAccessor);
+
+ List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig));
+
+ assertEquals(pinotTaskConfigs.size(), 0);
+ }
+
+ @Test
+ public void testGenerateTasksWithNewlyCompletedSegment() {
+ when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
+ Lists.newArrayList(_completedSegment));
+ _taskGenerator.init(_mockClusterInfoAccessor);
+
+ List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig));
+
+ assertEquals(pinotTaskConfigs.size(), 0);
+ }
+
+ @Test
+ public void testGetValidDocIdMetadataUrls()
+ throws URISyntaxException {
+ Map<String, List<String>> serverToSegments = new HashMap<>();
+ serverToSegments.put("server1",
+ Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName()));
+ serverToSegments.put("server2", Lists.newArrayList("consumingSegment"));
+ BiMap<String, String> serverToEndpoints = HashBiMap.create(1);
+ serverToEndpoints.put("server1", "http://endpoint1");
+ serverToEndpoints.put("server2", "http://endpoint2");
+ Set<String> completedSegments = new HashSet<>();
+ completedSegments.add(_completedSegment.getSegmentName());
+ completedSegments.add(_completedSegment2.getSegmentName());
+
+ List<String> validDocIdUrls =
+ UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints,
+ REALTIME_TABLE_NAME, completedSegments);
+
+ String expectedUrl =
+ String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s", "http://endpoint1",
+ REALTIME_TABLE_NAME, _completedSegment.getSegmentName(), _completedSegment2.getSegmentName());
+ assertEquals(validDocIdUrls.get(0), expectedUrl);
+ assertEquals(validDocIdUrls.size(), 1);
+ }
+
+ @Test
+ public void testGetValidDocIdMetadataUrlsWithReplicatedSegments()
+ throws URISyntaxException {
+ Map<String, List<String>> serverToSegments = new LinkedHashMap<>();
+ serverToSegments.put("server1",
+ Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName()));
+ serverToSegments.put("server2",
+ Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName()));
+ BiMap<String, String> serverToEndpoints = HashBiMap.create(1);
+ serverToEndpoints.put("server1", "http://endpoint1");
+ serverToEndpoints.put("server2", "http://endpoint2");
+ Set<String> completedSegments = new HashSet<>();
+ completedSegments.add(_completedSegment.getSegmentName());
+ completedSegments.add(_completedSegment2.getSegmentName());
+
+ List<String> validDocIdUrls =
+ UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints,
+ REALTIME_TABLE_NAME, completedSegments);
+
+ String expectedUrl =
+ String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s", "http://endpoint1",
+ REALTIME_TABLE_NAME, _completedSegment.getSegmentName(), _completedSegment2.getSegmentName());
+ assertEquals(validDocIdUrls.get(0), expectedUrl);
+ assertEquals(validDocIdUrls.size(), 1);
+ }
+
+ @Test
+ public void testGetMaxTasks() {
+ Map<String, String> taskConfigs = new HashMap<>();
+ taskConfigs.put(MinionConstants.TABLE_MAX_NUM_TASKS_KEY, "10");
+
+ int maxTasks =
+ UpsertCompactionTaskGenerator.getMaxTasks(UpsertCompactionTask.TASK_TYPE, REALTIME_TABLE_NAME, taskConfigs);
+
+ assertEquals(maxTasks, 10);
+ }
+
+ @Test
+ public void testProcessValidDocIdMetadata() {
+ Map<String, String> compactionConfigs = getCompactionConfigs("1", "10");
+ Set<Map.Entry<String, String>> responseSet = new HashSet<>();
+ String json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \""
+ + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + "}," + "{" + "\"totalValidDocs\" : 0,"
+ + "\"totalInvalidDocs\" : 10," + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\","
+ + "\"totalDocs\" : 10" + "}]";
+ responseSet.add(new AbstractMap.SimpleEntry<>("", json));
+ UpsertCompactionTaskGenerator.SegmentSelectionResult segmentSelectionResult =
+ UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet);
+ assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
+ _completedSegment.getSegmentName());
+ assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName());
+
+ // test with a higher invalidRecordsThresholdPercent
+ compactionConfigs = getCompactionConfigs("60", "10");
+ segmentSelectionResult =
+ UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet);
+ assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty());
+
+ // test without an invalidRecordsThresholdPercent
+ compactionConfigs = getCompactionConfigs("0", "10");
+ segmentSelectionResult =
+ UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet);
+ assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
+ _completedSegment.getSegmentName());
+
+ // test without a invalidRecordsThresholdCount
+ compactionConfigs = getCompactionConfigs("30", "0");
+ segmentSelectionResult =
+ UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet);
+ assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
+ _completedSegment.getSegmentName());
+ }
+
+ private Map<String, String> getCompactionConfigs(String invalidRecordsThresholdPercent,
+ String invalidRecordsThresholdCount) {
+ Map<String, String> compactionConfigs = new HashMap<>();
+ if (!invalidRecordsThresholdPercent.equals("0")) {
+ compactionConfigs.put(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, invalidRecordsThresholdPercent);
+ }
+ if (!invalidRecordsThresholdCount.equals("0")) {
+ compactionConfigs.put(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT, invalidRecordsThresholdCount);
+ }
+ return compactionConfigs;
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index d5741fa0b7..a64170c60f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -100,6 +100,7 @@ public final class TableConfigUtils {
// supported TableTaskTypes, must be identical to the one return in the impl of {@link PinotTaskGenerator}.
private static final String REALTIME_TO_OFFLINE_TASK_TYPE = "RealtimeToOfflineSegmentsTask";
+ private static final String UPSERT_COMPACTION_TASK_TYPE = "UpsertCompactionTask";
// this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
// hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
@@ -543,6 +544,33 @@ public final class TableConfigUtils {
}
}
}
+ } else if (taskTypeConfigName.equals(UPSERT_COMPACTION_TASK_TYPE)) {
+ // check table is realtime
+ Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+ "UpsertCompactionTask only supports realtime tables!");
+ // check upsert enabled
+ Preconditions.checkState(tableConfig.isUpsertEnabled(), "Upsert must be enabled for UpsertCompactionTask");
+
+ // check no malformed period
+ if (taskTypeConfig.containsKey("bufferTimePeriod")) {
+ TimeUtils.convertPeriodToMillis(taskTypeConfig.get("bufferTimePeriod"));
+ }
+ // check maxNumRecordsPerSegment
+ if (taskTypeConfig.containsKey("invalidRecordsThresholdPercent")) {
+ Preconditions.checkState(Double.parseDouble(taskTypeConfig.get("invalidRecordsThresholdPercent")) > 0
+ && Double.parseDouble(taskTypeConfig.get("invalidRecordsThresholdPercent")) <= 100,
+ "invalidRecordsThresholdPercent must be > 0 and <= 100");
+ }
+ // check invalidRecordsThresholdCount
+ if (taskTypeConfig.containsKey("invalidRecordsThresholdCount")) {
+ Preconditions.checkState(Long.parseLong(taskTypeConfig.get("invalidRecordsThresholdCount")) >= 1,
+ "invalidRecordsThresholdCount must be >= 1");
+ }
+ // check that either invalidRecordsThresholdPercent or invalidRecordsThresholdCount was provided
+ Preconditions.checkState(
+ taskTypeConfig.containsKey("invalidRecordsThresholdPercent") || taskTypeConfig.containsKey(
+ "invalidRecordsThresholdCount"),
+ "invalidRecordsThresholdPercent or invalidRecordsThresholdCount or both must be provided");
}
}
}
@@ -581,8 +609,8 @@ public final class TableConfigUtils {
Preconditions.checkState(streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType(),
"Upsert/Dedup table must use low-level streaming consumer type");
// replica group is configured for routing
- Preconditions.checkState(tableConfig.getRoutingConfig() != null
- && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
+ Preconditions.checkState(
+ tableConfig.getRoutingConfig() != null && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
"Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing");
// specifically for upsert
@@ -649,8 +677,8 @@ public final class TableConfigUtils {
*/
@VisibleForTesting
static void validateInstancePartitionsTypeMapConfig(TableConfig tableConfig) {
- if (MapUtils.isEmpty(tableConfig.getInstancePartitionsMap())
- || MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) {
+ if (MapUtils.isEmpty(tableConfig.getInstancePartitionsMap()) || MapUtils.isEmpty(
+ tableConfig.getInstanceAssignmentConfigMap())) {
return;
}
for (InstancePartitionsType instancePartitionsType : tableConfig.getInstancePartitionsMap().keySet()) {
@@ -668,11 +696,11 @@ public final class TableConfigUtils {
*/
@VisibleForTesting
static void validatePartitionedReplicaGroupInstance(TableConfig tableConfig) {
- if (tableConfig.getValidationConfig().getReplicaGroupStrategyConfig() == null
- || MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) {
+ if (tableConfig.getValidationConfig().getReplicaGroupStrategyConfig() == null || MapUtils.isEmpty(
+ tableConfig.getInstanceAssignmentConfigMap())) {
return;
}
- for (Map.Entry<String, InstanceAssignmentConfig> entry: tableConfig.getInstanceAssignmentConfigMap().entrySet()) {
+ for (Map.Entry<String, InstanceAssignmentConfig> entry : tableConfig.getInstanceAssignmentConfigMap().entrySet()) {
boolean isNullReplicaGroupPartitionConfig = entry.getValue().getReplicaGroupPartitionConfig() == null;
Preconditions.checkState(isNullReplicaGroupPartitionConfig,
"Both replicaGroupStrategyConfig and replicaGroupPartitionConfig is provided");
@@ -1025,8 +1053,8 @@ public final class TableConfigUtils {
return;
}
- boolean forwardIndexDisabled =
- Boolean.parseBoolean(fieldConfigProperties.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED,
+ boolean forwardIndexDisabled = Boolean.parseBoolean(
+ fieldConfigProperties.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED,
FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
if (!forwardIndexDisabled) {
return;
@@ -1045,20 +1073,23 @@ public final class TableConfigUtils {
}
Preconditions.checkState(
- !indexingConfigs.isOptimizeDictionaryForMetrics() && !indexingConfigs.isOptimizeDictionary(),
- String.format("Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)"
- + " not supported with forward index for column: %s, disabled", columnName));
+ !indexingConfigs.isOptimizeDictionaryForMetrics() && !indexingConfigs.isOptimizeDictionary(), String.format(
+ "Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)"
+ + " not supported with forward index for column: %s, disabled", columnName));
- boolean hasDictionary = fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY
- || noDictionaryColumns == null || !noDictionaryColumns.contains(columnName);
- boolean hasInvertedIndex = indexingConfigs.getInvertedIndexColumns() != null
- && indexingConfigs.getInvertedIndexColumns().contains(columnName);
+ boolean hasDictionary =
+ fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY || noDictionaryColumns == null
+ || !noDictionaryColumns.contains(columnName);
+ boolean hasInvertedIndex =
+ indexingConfigs.getInvertedIndexColumns() != null && indexingConfigs.getInvertedIndexColumns()
+ .contains(columnName);
if (!hasDictionary || !hasInvertedIndex) {
LOGGER.warn("Forward index has been disabled for column {}. Either dictionary ({}) and / or inverted index ({}) "
+ "has been disabled. If the forward index needs to be regenerated or another index added please refresh "
+ "or back-fill the forward index as it cannot be rebuilt without dictionary and inverted index.",
- columnName, hasDictionary ? "enabled" : "disabled", hasInvertedIndex ? "enabled" : "disabled");
+ columnName,
+ hasDictionary ? "enabled" : "disabled", hasInvertedIndex ? "enabled" : "disabled");
}
}
@@ -1271,9 +1302,7 @@ public final class TableConfigUtils {
if (clone.getFieldConfigList() != null) {
List<FieldConfig> cleanFieldConfigList = new ArrayList<>();
for (FieldConfig fieldConfig : clone.getFieldConfigList()) {
- cleanFieldConfigList.add(new FieldConfig.Builder(fieldConfig)
- .withIndexTypes(null)
- .build());
+ cleanFieldConfigList.add(new FieldConfig.Builder(fieldConfig).withIndexTypes(null).build());
}
clone.setFieldConfigList(cleanFieldConfigList);
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 1f6834f62c..9d58b42161 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -924,8 +924,8 @@ public class TableConfigUtilsTest {
// be rebuilt without a dictionary, the constraint to have a dictionary has been lifted.
Map<String, String> fieldConfigProperties = new HashMap<>();
fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
- FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, null, null, null, null,
- fieldConfigProperties);
+ FieldConfig fieldConfig =
+ new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, null, null, null, null, fieldConfigProperties);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
@@ -938,8 +938,8 @@ public class TableConfigUtilsTest {
// lifted.
Map<String, String> fieldConfigProperties = new HashMap<>();
fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
- FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, null, null, null, null,
- fieldConfigProperties);
+ FieldConfig fieldConfig =
+ new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, null, null, null, null, fieldConfigProperties);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
@@ -964,8 +964,9 @@ public class TableConfigUtilsTest {
// Enable forward index disabled flag for a column with inverted index
Map<String, String> fieldConfigProperties = new HashMap<>();
fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
- FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
- FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties);
+ FieldConfig fieldConfig =
+ new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED, null, null,
+ null, fieldConfigProperties);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
@@ -979,8 +980,9 @@ public class TableConfigUtilsTest {
// Enable forward index disabled flag for a column with inverted index and is sorted
Map<String, String> fieldConfigProperties = new HashMap<>();
fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
- FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
- FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties);
+ FieldConfig fieldConfig =
+ new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED, null, null,
+ null, fieldConfigProperties);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
@@ -994,9 +996,10 @@ public class TableConfigUtilsTest {
// Enable forward index disabled flag for a multi-value column with inverted index and range index
Map<String, String> fieldConfigProperties = new HashMap<>();
fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
- FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
- FieldConfig.IndexType.INVERTED, Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE),
- null, null, fieldConfigProperties);
+ FieldConfig fieldConfig =
+ new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED,
+ Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, null,
+ fieldConfigProperties);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for MV myCol2 with forward index disabled but has range and inverted index");
@@ -1011,9 +1014,10 @@ public class TableConfigUtilsTest {
// Enable forward index disabled flag for a singe-value column with inverted index and range index v1
Map<String, String> fieldConfigProperties = new HashMap<>();
fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
- FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.DICTIONARY,
- FieldConfig.IndexType.INVERTED, Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE),
- null, null, fieldConfigProperties);
+ FieldConfig fieldConfig =
+ new FieldConfig("myCol1", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED,
+ Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, null,
+ fieldConfigProperties);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
tableConfig.getIndexingConfig().setRangeIndexVersion(1);
TableConfigUtils.validate(tableConfig, schema);
@@ -1030,14 +1034,15 @@ public class TableConfigUtilsTest {
// Enable forward index disabled flag for a column with inverted index and disable dictionary
Map<String, String> fieldConfigProperties = new HashMap<>();
fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
- FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.RAW,
- FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties);
+ FieldConfig fieldConfig =
+ new FieldConfig("myCol2", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.INVERTED, null, null, null,
+ fieldConfigProperties);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should not be able to disable dictionary but keep inverted index");
} catch (Exception e) {
- Assert.assertEquals(e.getMessage(), "Cannot create an Inverted index on column myCol2 specified in the "
- + "noDictionaryColumns config");
+ Assert.assertEquals(e.getMessage(),
+ "Cannot create an Inverted index on column myCol2 specified in the " + "noDictionaryColumns config");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1046,8 +1051,9 @@ public class TableConfigUtilsTest {
// Enable forward index disabled flag for a column with FST index and disable dictionary
Map<String, String> fieldConfigProperties = new HashMap<>();
fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
- FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.RAW,
- FieldConfig.IndexType.FST, null, null, null, fieldConfigProperties);
+ FieldConfig fieldConfig =
+ new FieldConfig("myCol2", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.FST, null, null, null,
+ fieldConfigProperties);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should not be able to disable dictionary but keep inverted index");
@@ -1061,8 +1067,9 @@ public class TableConfigUtilsTest {
// Enable forward index disabled flag for a column with FST index and disable dictionary
Map<String, String> fieldConfigProperties = new HashMap<>();
fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
- FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.RAW,
- FieldConfig.IndexType.RANGE, null, null, null, fieldConfigProperties);
+ FieldConfig fieldConfig =
+ new FieldConfig("intCol", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.RANGE, null, null, null,
+ fieldConfigProperties);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
@@ -1415,8 +1422,7 @@ public class TableConfigUtilsTest {
@Test
public void testValidateUpsertConfig() {
Schema schema =
- new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
- .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.build();
UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
TableConfig tableConfig =
@@ -1526,12 +1532,10 @@ public class TableConfigUtilsTest {
// Table upsert with delete column
String incorrectTypeDelCol = "incorrectTypeDeleteCol";
String delCol = "myDelCol";
- schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
- .setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
.addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.addSingleValueDimension(incorrectTypeDelCol, FieldSpec.DataType.STRING)
- .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN)
- .build();
+ .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN).build();
streamConfigs = getStreamConfigs();
streamConfigs.put("stream.kafka.consumer.type", "simple");
@@ -1783,16 +1787,14 @@ public class TableConfigUtilsTest {
InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class);
TableConfig tableConfigWithoutInstancePartitionsMap =
- new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .build();
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
// Call validate with a table-config without any instance partitions or instance assignment config
TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithoutInstancePartitionsMap);
TableConfig tableConfigWithInstancePartitionsMap =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
- .build();
+ .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE")).build();
// Call validate with a table-config with instance partitions set but not instance assignment config
TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithInstancePartitionsMap);
@@ -1800,8 +1802,7 @@ public class TableConfigUtilsTest {
TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
.setInstanceAssignmentConfigMap(
- ImmutableMap.of(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig))
- .build();
+ ImmutableMap.of(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build();
try {
// Call validate with instance partitions and config set for the same type
TableConfigUtils.validateInstancePartitionsTypeMapConfig(invalidTableConfig);
@@ -1895,15 +1896,64 @@ public class TableConfigUtilsTest {
}
}
+ @Test
+ public void testUpsertCompactionTaskConfig() {
+ Schema schema =
+ new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+ Map<String, String> upsertCompactionTaskConfig =
+ ImmutableMap.of("bufferTimePeriod", "5d", "invalidRecordsThresholdPercent", "1", "invalidRecordsThresholdCount",
+ "1");
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
+ .build();
+
+ TableConfigUtils.validateTaskConfigs(tableConfig, schema);
+
+ // test with invalid invalidRecordsThresholdPercents
+ upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdPercent", "0");
+ TableConfig zeroPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
+ .build();
+ Assert.assertThrows(IllegalStateException.class,
+ () -> TableConfigUtils.validateTaskConfigs(zeroPercentTableConfig, schema));
+ upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdPercent", "110");
+ TableConfig hundredTenPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
+ .build();
+ Assert.assertThrows(IllegalStateException.class,
+ () -> TableConfigUtils.validateTaskConfigs(hundredTenPercentTableConfig, schema));
+
+ // test with invalid invalidRecordsThresholdCount
+ upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdCount", "0");
+ TableConfig invalidCountTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
+ .build();
+ Assert.assertThrows(IllegalStateException.class,
+ () -> TableConfigUtils.validateTaskConfigs(invalidCountTableConfig, schema));
+
+ // test without invalidRecordsThresholdPercent or invalidRecordsThresholdCount
+ upsertCompactionTaskConfig = ImmutableMap.of("bufferTimePeriod", "5d");
+ TableConfig invalidTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+ .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
+ .build();
+ Assert.assertThrows(IllegalStateException.class,
+ () -> TableConfigUtils.validateTaskConfigs(invalidTableConfig, schema));
+ }
+
@Test
public void testValidatePartitionedReplicaGroupInstance() {
String partitionColumn = "testPartitionCol";
- ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
- new ReplicaGroupStrategyConfig(partitionColumn, 2);
+ ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new ReplicaGroupStrategyConfig(partitionColumn, 2);
TableConfig tableConfigWithoutReplicaGroupStrategyConfig =
- new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .build();
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
// Call validate with a table-config without replicaGroupStrategyConfig or replicaGroupPartitionConfig.
TableConfigUtils.validatePartitionedReplicaGroupInstance(tableConfigWithoutReplicaGroupStrategyConfig);
@@ -1918,12 +1968,12 @@ public class TableConfigUtilsTest {
InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class);
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 2, 0, false, partitionColumn);
- Mockito.doReturn(instanceReplicaGroupPartitionConfig)
- .when(instanceAssignmentConfig).getReplicaGroupPartitionConfig();
+ Mockito.doReturn(instanceReplicaGroupPartitionConfig).when(instanceAssignmentConfig)
+ .getReplicaGroupPartitionConfig();
- TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE)
- .setTableName(TABLE_NAME).setInstanceAssignmentConfigMap(
- ImmutableMap.of(TableType.OFFLINE.toString(), instanceAssignmentConfig)).build();
+ TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setInstanceAssignmentConfigMap(ImmutableMap.of(TableType.OFFLINE.toString(), instanceAssignmentConfig))
+ .build();
invalidTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
try {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 8148956ae6..c03328c2bb 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -71,6 +71,7 @@ import org.apache.pinot.common.restlet.resources.TablesList;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -164,8 +165,8 @@ public class TablesResource {
@Encoded
@Produces(MediaType.APPLICATION_JSON)
@Path("/tables/{tableName}/metadata")
- @ApiOperation(value = "List metadata for all segments of a given table",
- notes = "List segments metadata of table hosted on this server")
+ @ApiOperation(value = "List metadata for all segments of a given table", notes = "List segments metadata of table "
+ + "hosted on this server")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 500, message = "Internal server error"),
@@ -174,7 +175,7 @@ public class TablesResource {
public String getSegmentMetadata(
@ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Column name", allowMultiple = true) @QueryParam("columns") @DefaultValue("")
- List<String> columns)
+ List<String> columns)
throws WebApplicationException {
InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
@@ -299,7 +300,7 @@ public class TablesResource {
@PathParam("tableName") String tableName,
@ApiParam(value = "Segment name", required = true) @PathParam("segmentName") String segmentName,
@ApiParam(value = "Column name", allowMultiple = true) @QueryParam("columns") @DefaultValue("")
- List<String> columns) {
+ List<String> columns) {
for (int i = 0; i < columns.size(); i++) {
try {
columns.set(i, URLDecoder.decode(columns.get(i), StandardCharsets.UTF_8.name()));
@@ -348,8 +349,8 @@ public class TablesResource {
try {
Map<String, String> segmentCrcForTable = new HashMap<>();
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
- segmentCrcForTable
- .put(segmentDataManager.getSegmentName(), segmentDataManager.getSegment().getSegmentMetadata().getCrc());
+ segmentCrcForTable.put(segmentDataManager.getSegmentName(),
+ segmentDataManager.getSegment().getSegmentMetadata().getCrc());
}
return ResourceUtils.convertToJsonString(segmentCrcForTable);
} catch (Exception e) {
@@ -429,6 +430,7 @@ public class TablesResource {
@PathParam("tableNameWithType") String tableNameWithType,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
@Context HttpHeaders httpHeaders) {
+ segmentName = URIUtils.decode(segmentName);
LOGGER.info("Received a request to download validDocIds for segment {} table {}", segmentName, tableNameWithType);
// Validate data access
ServerResourceUtils.validateDataAccess(_accessControlFactory, tableNameWithType, httpHeaders);
@@ -466,6 +468,60 @@ public class TablesResource {
}
}
+ @GET
+ @Path("/tables/{tableNameWithType}/validDocIdMetadata")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Provides segment validDocId metadata", notes = "Provides segment validDocId metadata")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 500, message = "Internal server error", response = ErrorInfo.class),
+ @ApiResponse(code = 404, message = "Table or segment not found", response = ErrorInfo.class)
+ })
+ public String getValidDocIdMetadata(
+ @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME")
+ @PathParam("tableNameWithType") String tableNameWithType,
+ @ApiParam(value = "Segment name", allowMultiple = true, required = true) @QueryParam("segmentNames")
+ List<String> segmentNames) {
+ TableDataManager tableDataManager =
+ ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
+ List<String> missingSegments = new ArrayList<>();
+ List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(segmentNames, missingSegments);
+ if (!missingSegments.isEmpty()) {
+ throw new WebApplicationException(String.format("Table %s has missing segments", tableNameWithType),
+ Response.Status.NOT_FOUND);
+ }
+ List<Map<String, Object>> allValidDocIdMetadata = new ArrayList<>();
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ try {
+ IndexSegment indexSegment = segmentDataManager.getSegment();
+ if (!(indexSegment instanceof ImmutableSegmentImpl)) {
+ throw new WebApplicationException(
+ String.format("Table %s segment %s is not a immutable segment", tableNameWithType,
+ segmentDataManager.getSegmentName()), Response.Status.BAD_REQUEST);
+ }
+ MutableRoaringBitmap validDocIds =
+ indexSegment.getValidDocIds() != null ? indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+ if (validDocIds == null) {
+ throw new WebApplicationException(
+ String.format("Missing validDocIds for table %s segment %s does not exist", tableNameWithType,
+ segmentDataManager.getSegmentName()), Response.Status.NOT_FOUND);
+ }
+ Map<String, Object> validDocIdMetadata = new HashMap<>();
+ int totalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
+ int totalValidDocs = validDocIds.getCardinality();
+ int totalInvalidDocs = totalDocs - totalValidDocs;
+ validDocIdMetadata.put("segmentName", segmentDataManager.getSegmentName());
+ validDocIdMetadata.put("totalDocs", totalDocs);
+ validDocIdMetadata.put("totalValidDocs", totalValidDocs);
+ validDocIdMetadata.put("totalInvalidDocs", totalInvalidDocs);
+ allValidDocIdMetadata.add(validDocIdMetadata);
+ } finally {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
+ return ResourceUtils.convertToJsonString(allValidDocIdMetadata);
+ }
+
/**
* Upload a low level consumer segment to segment store and return the segment download url. This endpoint is used
* when segment store copy is unavailable for committed low level consumer segments.
@@ -493,7 +549,7 @@ public class TablesResource {
})
public String uploadLLCSegment(
@ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName")
- String realtimeTableName,
+ String realtimeTableName,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName,
@QueryParam("uploadTimeoutMs") @DefaultValue("-1") int timeoutMs)
throws Exception {
@@ -561,13 +617,13 @@ public class TablesResource {
@GET
@Path("tables/{realtimeTableName}/consumingSegmentsInfo")
@Produces(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Get the info for consumers of this REALTIME table",
- notes = "Get consumers info from the table data manager. Note that the partitionToOffsetMap has been deprecated "
+ @ApiOperation(value = "Get the info for consumers of this REALTIME table", notes =
+ "Get consumers info from the table data manager. Note that the partitionToOffsetMap has been deprecated "
+ "and will be removed in the next release. The info is now embedded within each partition's state as "
+ "currentOffsetsMap")
public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
@ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName")
- String realtimeTableName) {
+ String realtimeTableName) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
if (TableType.OFFLINE == tableType) {
throw new WebApplicationException("Cannot get consuming segment info for OFFLINE table: " + realtimeTableName);
@@ -590,18 +646,14 @@ public class TablesResource {
recordsLagMap.put(k, v.getRecordsLag());
availabilityLagMsMap.put(k, v.getAvailabilityLagMs());
});
- @Deprecated Map<String, String> partitiionToOffsetMap =
- realtimeSegmentDataManager.getPartitionToCurrentOffset();
- segmentConsumerInfoList.add(
- new SegmentConsumerInfo(segmentDataManager.getSegmentName(),
- realtimeSegmentDataManager.getConsumerState().toString(),
- realtimeSegmentDataManager.getLastConsumedTimestamp(),
- partitiionToOffsetMap,
- new SegmentConsumerInfo.PartitionOffsetInfo(
- partitiionToOffsetMap,
- partitionStateMap.entrySet().stream().collect(
- Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString())
- ), recordsLagMap, availabilityLagMsMap)));
+ @Deprecated
+ Map<String, String> partitiionToOffsetMap = realtimeSegmentDataManager.getPartitionToCurrentOffset();
+ segmentConsumerInfoList.add(new SegmentConsumerInfo(segmentDataManager.getSegmentName(),
+ realtimeSegmentDataManager.getConsumerState().toString(),
+ realtimeSegmentDataManager.getLastConsumedTimestamp(), partitiionToOffsetMap,
+ new SegmentConsumerInfo.PartitionOffsetInfo(partitiionToOffsetMap, partitionStateMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString())),
+ recordsLagMap, availabilityLagMsMap)));
}
}
} catch (Exception e) {
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index bb912dd9e0..7e7c4b7fba 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -113,29 +113,30 @@ public class TablesResourceTest extends BaseResourceTest {
public void getTableMetadata()
throws Exception {
for (TableType tableType : TableType.values()) {
- String tableMetadataPath = "/tables/" + TableNameBuilder.forType(tableType).tableNameWithType(TABLE_NAME)
- + "/metadata";
+ String tableMetadataPath =
+ "/tables/" + TableNameBuilder.forType(tableType).tableNameWithType(TABLE_NAME) + "/metadata";
JsonNode jsonResponse =
JsonUtils.stringToJsonNode(_webTarget.path(tableMetadataPath).request().get(String.class));
TableMetadataInfo metadataInfo = JsonUtils.jsonNodeToObject(jsonResponse, TableMetadataInfo.class);
Assert.assertNotNull(metadataInfo);
- Assert.assertEquals(metadataInfo.getTableName(), TableNameBuilder.forType(tableType)
- .tableNameWithType(TABLE_NAME));
+ Assert.assertEquals(metadataInfo.getTableName(),
+ TableNameBuilder.forType(tableType).tableNameWithType(TABLE_NAME));
Assert.assertEquals(metadataInfo.getColumnLengthMap().size(), 0);
Assert.assertEquals(metadataInfo.getColumnCardinalityMap().size(), 0);
Assert.assertEquals(metadataInfo.getColumnIndexSizeMap().size(), 0);
- jsonResponse = JsonUtils.stringToJsonNode(_webTarget.path(tableMetadataPath)
- .queryParam("columns", "column1").queryParam("columns", "column2").request().get(String.class));
+ jsonResponse = JsonUtils.stringToJsonNode(
+ _webTarget.path(tableMetadataPath).queryParam("columns", "column1").queryParam("columns", "column2").request()
+ .get(String.class));
metadataInfo = JsonUtils.jsonNodeToObject(jsonResponse, TableMetadataInfo.class);
Assert.assertEquals(metadataInfo.getColumnLengthMap().size(), 2);
Assert.assertEquals(metadataInfo.getColumnCardinalityMap().size(), 2);
Assert.assertEquals(metadataInfo.getColumnIndexSizeMap().size(), 2);
- Assert.assertTrue(metadataInfo.getColumnIndexSizeMap().get("column1")
- .containsKey(StandardIndexes.dictionary().getId()));
- Assert.assertTrue(metadataInfo.getColumnIndexSizeMap().get("column2")
- .containsKey(StandardIndexes.forward().getId()));
+ Assert.assertTrue(
+ metadataInfo.getColumnIndexSizeMap().get("column1").containsKey(StandardIndexes.dictionary().getId()));
+ Assert.assertTrue(
+ metadataInfo.getColumnIndexSizeMap().get("column2").containsKey(StandardIndexes.forward().getId()));
}
// No such table
@@ -148,9 +149,8 @@ public class TablesResourceTest extends BaseResourceTest {
public void testSegmentMetadata()
throws Exception {
IndexSegment defaultSegment = _realtimeIndexSegments.get(0);
- String segmentMetadataPath =
- "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/" + defaultSegment
- .getSegmentName() + "/metadata";
+ String segmentMetadataPath = "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/"
+ + defaultSegment.getSegmentName() + "/metadata";
JsonNode jsonResponse =
JsonUtils.stringToJsonNode(_webTarget.path(segmentMetadataPath).request().get(String.class));
@@ -188,8 +188,8 @@ public class TablesResourceTest extends BaseResourceTest {
.get(Response.class);
Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
- response = _webTarget
- .path("/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
+ response = _webTarget.path(
+ "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
.request().get(Response.class);
Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
}
@@ -229,8 +229,8 @@ public class TablesResourceTest extends BaseResourceTest {
Response response = _webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname").request().get(Response.class);
Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
- response = _webTarget
- .path("/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
+ response = _webTarget.path(
+ "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
.request().get(Response.class);
Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
}
@@ -243,8 +243,8 @@ public class TablesResourceTest extends BaseResourceTest {
(ImmutableSegmentImpl) _realtimeIndexSegments.get(0));
// Verify non-existent table and segment download return NOT_FOUND status.
- Response response = _webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname/validDocIds").request()
- .get(Response.class);
+ Response response =
+ _webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname/validDocIds").request().get(Response.class);
Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
response = _webTarget.path(
@@ -253,6 +253,26 @@ public class TablesResourceTest extends BaseResourceTest {
Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
}
+ @Test
+ public void testValidDocIdMetadata()
+ throws IOException {
+ IndexSegment segment = _realtimeIndexSegments.get(0);
+ // Verify the content of the downloaded snapshot from a realtime table.
+ downLoadAndVerifyValidDocIdsSnapshot(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+ (ImmutableSegmentImpl) segment);
+
+ String validDocIdMetadataPath =
+ "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/validDocIdMetadata";
+ String metadataResponse =
+ _webTarget.path(validDocIdMetadataPath).queryParam("segmentNames", segment.getSegmentName()).request()
+ .get(String.class);
+ JsonNode validDocIdMetadata = JsonUtils.stringToJsonNode(metadataResponse).get(0);
+
+ Assert.assertEquals(validDocIdMetadata.get("totalDocs").asInt(), 100000);
+ Assert.assertEquals(validDocIdMetadata.get("totalValidDocs").asInt(), 8);
+ Assert.assertEquals(validDocIdMetadata.get("totalInvalidDocs").asInt(), 99992);
+ }
+
// Verify metadata file from segments.
private void downLoadAndVerifySegmentContent(String tableNameWithType, IndexSegment segment)
throws IOException {
@@ -290,7 +310,7 @@ public class TablesResourceTest extends BaseResourceTest {
PartitionUpsertMetadataManager upsertMetadataManager = mock(PartitionUpsertMetadataManager.class);
ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
int[] docIds = new int[]{1, 4, 6, 10, 15, 17, 18, 20};
- for (int docId: docIds) {
+ for (int docId : docIds) {
validDocIds.add(docId);
}
segment.enableUpsert(upsertMetadataManager, validDocIds, null);
@@ -315,21 +335,21 @@ public class TablesResourceTest extends BaseResourceTest {
_realtimeIndexSegments);
// Verify segment uploading succeed.
- Response response = _webTarget.path(String
- .format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+ Response response = _webTarget.path(
+ String.format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null);
Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
Assert.assertEquals(response.readEntity(String.class), SEGMENT_DOWNLOAD_URL);
// Verify bad request: table type is offline
- response = _webTarget.path(String
- .format("/segments/%s/%s/upload", TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
+ response = _webTarget.path(
+ String.format("/segments/%s/%s/upload", TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
_offlineIndexSegments.get(0).getSegmentName())).request().post(null);
Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
// Verify bad request: segment is not low level consumer segment
- response = _webTarget.path(String
- .format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+ response = _webTarget.path(
+ String.format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
_realtimeIndexSegments.get(0).getSegmentName())).request().post(null);
Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
@@ -349,9 +369,8 @@ public class TablesResourceTest extends BaseResourceTest {
public void testOfflineTableSegmentMetadata()
throws Exception {
IndexSegment defaultSegment = _offlineIndexSegments.get(0);
- String segmentMetadataPath =
- "/tables/" + TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME) + "/segments/" + defaultSegment
- .getSegmentName() + "/metadata";
+ String segmentMetadataPath = "/tables/" + TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME) + "/segments/"
+ + defaultSegment.getSegmentName() + "/metadata";
JsonNode jsonResponse =
JsonUtils.stringToJsonNode(_webTarget.path(segmentMetadataPath).request().get(String.class));
@@ -383,8 +402,8 @@ public class TablesResourceTest extends BaseResourceTest {
.get(Response.class);
Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
- response = _webTarget
- .path("/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
+ response = _webTarget.path(
+ "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
.request().get(Response.class);
Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org