You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/01/04 23:51:15 UTC
[incubator-pinot] 01/01: Split ValidationManager duties into
separate ControllerPeriodicTasks
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch split_validation_manager_periodic_tasks
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit a786bb90e5fbf1617a338c36b416733a15beb1a5
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Jan 4 15:50:52 2019 -0800
Split ValidationManager duties into separate ControllerPeriodicTasks
---
.../linkedin/pinot/controller/ControllerConf.java | 135 +++++++++++------
.../pinot/controller/ControllerStarter.java | 41 ++++--
.../realtime/PinotLLCRealtimeSegmentManager.java | 4 +-
.../helix/core/retention/RetentionManager.java | 12 +-
.../BrokerResourceValidationManager.java | 80 ++++++++++
...ger.java => OfflineSegmentIntervalChecker.java} | 116 ++-------------
.../RealtimeSegmentValidationManager.java | 162 +++++++++++++++++++++
.../helix/core/retention/RetentionManagerTest.java | 11 +-
.../validation/ValidationManagerTest.java | 14 +-
.../tests/SegmentCompletionIntegrationTests.java | 4 +-
.../admin/command/StartControllerCommand.java | 4 +-
11 files changed, 406 insertions(+), 177 deletions(-)
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index b391765..f2b7f9f 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -51,15 +51,40 @@ public class ControllerConf extends PropertiesConfiguration {
private static final String CONSOLE_WEBAPP_ROOT_PATH = "controller.query.console";
private static final String CONSOLE_WEBAPP_USE_HTTPS = "controller.query.console.useHttps";
private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT = "controller.upload.onlineToOfflineTimeout";
- private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
- private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = "controller.validation.frequencyInSeconds";
- private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds";
- private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "controller.realtime.segment.relocator.frequency";
- private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS = "controller.statuschecker.waitForPushTimeInSeconds";
+
+ public static class ControllerPeriodicTasksConf {
+ private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
+ private static final String OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS =
+ "controller.offline.segment.interval.checker.frequencyInSeconds";
+ private static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
+ "controller.realtime.segment.validation.frequencyInSeconds";
+ private static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS =
+ "controller.broker.resource.validation.frequencyInSeconds";
+ private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds";
+ private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS =
+ "controller.statuschecker.waitForPushTimeInSeconds";
+ private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
+ private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY =
+ "controller.realtime.segment.relocator.frequency";
+ // Because segment level validation is expensive and requires heavy ZK access, we run segment level validation with a
+ // separate interval
+ private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
+ "controller.segment.level.validation.intervalInSeconds";
+
+ private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
+ private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
+ private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+ private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+ private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
+ private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+ private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
+ private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "1h"; // 1 hour
+ private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
+ }
+
private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = "server.request.timeoutSeconds";
private static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = "controller.realtime.segment.commit.timeoutSeconds";
private static final String DELETED_SEGMENTS_RETENTION_IN_DAYS = "controller.deleted.segments.retentionInDays";
- private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
private static final String TABLE_MIN_REPLICAS = "table.minReplicas";
private static final String ENABLE_SPLIT_COMMIT = "controller.enable.split.commit";
private static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port";
@@ -70,25 +95,17 @@ public class ControllerConf extends PropertiesConfiguration {
private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = "controller.segment.upload.timeoutInMillis";
private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = "controller.realtime.segment.metadata.commit.numLocks";
private static final String ENABLE_STORAGE_QUOTA_CHECK = "controller.enable.storage.quota.check";
- // Because segment level validation is expensive and requires heavy ZK access, we run segment level validation with a
- // separate interval
- private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
- "controller.segment.level.validation.intervalInSeconds";
+
private static final String ENABLE_BATCH_MESSAGE_MODE = "controller.enable.batch.message.mode";
// Defines the kind of storage and the underlying PinotFS implementation
private static final String PINOT_FS_FACTORY_CLASS_PREFIX = "controller.storage.factory.class";
private static final String PINOT_FS_FACTORY_CLASS_LOCAL = "controller.storage.factory.class.file";
- private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
- private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
- private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
- private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "1h"; // 1 hour
- private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+
private static final long DEFAULT_EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT_MILLIS = 120_000L; // 2 minutes
private static final int DEFAULT_SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = 30;
private static final int DEFAULT_DELETED_SEGMENTS_RETENTION_IN_DAYS = 7;
- private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
private static final int DEFAULT_TABLE_MIN_REPLICAS = 1;
private static final boolean DEFAULT_ENABLE_SPLIT_COMMIT = false;
private static final int DEFAULT_JERSEY_ADMIN_PORT = 21000;
@@ -98,7 +115,6 @@ public class ControllerConf extends PropertiesConfiguration {
private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = true;
- private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName();
@@ -321,58 +337,91 @@ public class ControllerConf extends PropertiesConfiguration {
}
public int getRetentionControllerFrequencyInSeconds() {
- if (containsKey(RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) {
- return Integer.parseInt((String) getProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
+ if (containsKey(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) {
+ return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
}
- return DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
+ return ControllerPeriodicTasksConf.DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
}
public void setRetentionControllerFrequencyInSeconds(int retentionFrequencyInSeconds) {
- setProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(retentionFrequencyInSeconds));
+ setProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS,
+ Integer.toString(retentionFrequencyInSeconds));
+ }
+
+ public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
+ if (containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)) {
+ return Integer.parseInt(
+ (String) getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS));
+ }
+ return ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS;
+ }
+
+ public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int validationFrequencyInSeconds) {
+ setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS,
+ Integer.toString(validationFrequencyInSeconds));
+ }
+
+ public int getRealtimeSegmentValidationFrequencyInSeconds() {
+
+ if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS)) {
+ return Integer.parseInt(
+ (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS));
+ }
+ return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS;
+ }
+
+ public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
+ setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS,
+ Integer.toString(validationFrequencyInSeconds));
}
- public int getValidationControllerFrequencyInSeconds() {
- if (containsKey(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
- return Integer.parseInt((String) getProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
+ public int getBrokerResourceValidationFrequencyInSeconds() {
+ if (containsKey(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS)) {
+ return Integer.parseInt(
+ (String) getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS));
}
- return DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
+ return ControllerPeriodicTasksConf.DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS;
}
- public void setValidationControllerFrequencyInSeconds(int validationFrequencyInSeconds) {
- setProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(validationFrequencyInSeconds));
+ public void setBrokerResourceValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
+ setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS,
+ Integer.toString(validationFrequencyInSeconds));
}
public int getStatusCheckerFrequencyInSeconds() {
- if (containsKey(STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
- return Integer.parseInt((String) getProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS));
+ if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
+ return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS));
}
- return DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
+ return ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
}
public void setStatusCheckerFrequencyInSeconds(int statusCheckerFrequencyInSeconds) {
- setProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS, Integer.toString(statusCheckerFrequencyInSeconds));
+ setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS,
+ Integer.toString(statusCheckerFrequencyInSeconds));
}
public String getRealtimeSegmentRelocatorFrequency() {
- if (containsKey(REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) {
- return (String) getProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
+ if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) {
+ return (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
}
- return DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
+ return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
}
public void setRealtimeSegmentRelocatorFrequency(String relocatorFrequency) {
- setProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency);
+ setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency);
}
public int getStatusCheckerWaitForPushTimeInSeconds() {
- if (containsKey(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) {
- return Integer.parseInt((String) getProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
+ if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) {
+ return Integer.parseInt(
+ (String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
}
- return DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
+ return ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
}
public void setStatusCheckerWaitForPushTimeInSeconds(int statusCheckerWaitForPushTimeInSeconds) {
- setProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS, Integer.toString(statusCheckerWaitForPushTimeInSeconds));
+ setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS,
+ Integer.toString(statusCheckerWaitForPushTimeInSeconds));
}
public long getExternalViewOnlineToOfflineTimeout() {
@@ -414,11 +463,12 @@ public class ControllerConf extends PropertiesConfiguration {
}
public int getTaskManagerFrequencyInSeconds() {
- return getInt(TASK_MANAGER_FREQUENCY_IN_SECONDS, DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
+ return getInt(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS,
+ ControllerPeriodicTasksConf.DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
}
public void setTaskManagerFrequencyInSeconds(int frequencyInSeconds) {
- setProperty(TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
+ setProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
}
public int getDefaultTableMinReplicas() {
@@ -466,6 +516,7 @@ public class ControllerConf extends PropertiesConfiguration {
}
public int getSegmentLevelValidationIntervalInSeconds() {
- return getInt(SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS, DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
+ return getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
+ ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
}
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 1e8552a..1f64920 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -38,7 +38,9 @@ import com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentMan
import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
import com.linkedin.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import com.linkedin.pinot.controller.validation.BrokerResourceValidationManager;
+import com.linkedin.pinot.controller.validation.OfflineSegmentIntervalChecker;
+import com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
import com.linkedin.pinot.core.periodictask.PeriodicTask;
import com.linkedin.pinot.filesystem.PinotFSFactory;
@@ -83,7 +85,9 @@ public class ControllerStarter {
private final ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
// Can only be constructed after resource manager getting started
- private ValidationManager _validationManager;
+ private OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
+ private RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
+ private BrokerResourceValidationManager _brokerResourceValidationManager;
private RealtimeSegmentRelocator _realtimeSegmentRelocator;
private PinotHelixTaskResourceManager _helixTaskResourceManager;
private PinotTaskManager _taskManager;
@@ -92,8 +96,7 @@ public class ControllerStarter {
_config = conf;
_adminApp = new ControllerAdminApiApplication(_config.getQueryConsoleWebappPath(), _config.getQueryConsoleUseHttps());
_helixResourceManager = new PinotHelixResourceManager(_config);
- _retentionManager = new RetentionManager(_helixResourceManager, _config.getRetentionControllerFrequencyInSeconds(),
- _config.getDeletedSegmentsRetentionInDays());
+ _retentionManager = new RetentionManager(_helixResourceManager, _config);
_metricsRegistry = new MetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager);
@@ -108,8 +111,16 @@ public class ControllerStarter {
return _helixResourceManager;
}
- public ValidationManager getValidationManager() {
- return _validationManager;
+ public OfflineSegmentIntervalChecker getOfflineSegmentIntervalChecker() {
+ return _offlineSegmentIntervalChecker;
+ }
+
+ public RealtimeSegmentValidationManager getRealtimeSegmentValidationManager() {
+ return _realtimeSegmentValidationManager;
+ }
+
+ public BrokerResourceValidationManager getBrokerResourceValidationManager() {
+ return _brokerResourceValidationManager;
}
public PinotHelixTaskResourceManager getHelixTaskResourceManager() {
@@ -178,10 +189,18 @@ public class ControllerStarter {
_taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
periodicTasks.add(_taskManager);
periodicTasks.add(_retentionManager);
- _validationManager =
- new ValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
+ _offlineSegmentIntervalChecker =
+ new OfflineSegmentIntervalChecker(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
+ new ValidationMetrics(_metricsRegistry));
+ _realtimeSegmentValidationManager =
+ new RealtimeSegmentValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
new ValidationMetrics(_metricsRegistry));
- periodicTasks.add(_validationManager);
+ _brokerResourceValidationManager =
+ new BrokerResourceValidationManager(_config, _helixResourceManager);
+
+ periodicTasks.add(_offlineSegmentIntervalChecker);
+ periodicTasks.add(_realtimeSegmentValidationManager);
+ periodicTasks.add(_brokerResourceValidationManager);
periodicTasks.add(_segmentStatusChecker);
periodicTasks.add(_realtimeSegmentRelocator);
@@ -348,7 +367,9 @@ public class ControllerStarter {
conf.setControllerVipHost("localhost");
conf.setControllerVipProtocol("http");
conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
- conf.setValidationControllerFrequencyInSeconds(3600);
+ conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+ conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+ conf.setBrokerResourceValidationFrequencyInSeconds(3600);
conf.setStatusCheckerFrequencyInSeconds(5 * 60);
conf.setRealtimeSegmentRelocatorFrequency("1h");
conf.setStatusCheckerWaitForPushTimeInSeconds(10 * 60);
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 486231a..7b5093e 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -815,8 +815,8 @@ public class PinotLLCRealtimeSegmentManager {
/**
* An instance is reporting that it has stopped consuming a topic due to some error.
* Mark the state of the segment to be OFFLINE in idealstate.
- * When all replicas of this segment are marked offline, the ValidationManager, in its next
- * run, will auto-create a new segment with the appropriate offset.
+ * When all replicas of this segment are marked offline, the {@link com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager},
+ * in its next run, will auto-create a new segment with the appropriate offset.
*/
public void segmentStoppedConsuming(final LLCSegmentName segmentName, final String instance) {
String rawTableName = segmentName.getTableName();
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index 134c4ee..36367f9 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -23,6 +23,7 @@ import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import com.linkedin.pinot.common.utils.CommonConstants;
import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import com.linkedin.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
@@ -49,10 +50,9 @@ public class RetentionManager extends ControllerPeriodicTask {
private final int _deletedSegmentsRetentionInDays;
- public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds,
- int deletedSegmentsRetentionInDays) {
- super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager);
- _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
+ public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
+ super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(), pinotHelixResourceManager);
+ _deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
@@ -145,7 +145,7 @@ public class RetentionManager extends ControllerPeriodicTask {
// In progress segment, only check LLC segment
if (SegmentName.isLowLevelConsumerSegmentName(segmentName)) {
// Delete old LLC segment that hangs around. Do not delete segment that are current since there may be a race
- // with ValidationManager trying to auto-create the LLC segment
+ // with RealtimeSegmentValidationManager trying to auto-create the LLC segment
if (shouldDeleteInProgressLLCSegment(segmentName, idealState, realtimeSegmentZKMetadata)) {
segmentsToDelete.add(segmentName);
}
@@ -169,7 +169,7 @@ public class RetentionManager extends ControllerPeriodicTask {
return false;
}
// delete a segment only if it is old enough (5 days) or else,
- // 1. latest segment could get deleted in the middle of repair by ValidationManager
+ // 1. latest segment could get deleted in the middle of repair by RealtimeSegmentValidationManager
// 2. for a brand new segment, if this code kicks in after new metadata is created but ideal state entry is not yet created (between step 2 and 3),
// the latest segment metadata could get marked for deletion
if (System.currentTimeMillis() - realtimeSegmentZKMetadata.getCreationTime()
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
new file mode 100644
index 0000000..e3264c4
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Rebuilds the broker resource if the instance set has changed
+ */
+public class BrokerResourceValidationManager extends ControllerPeriodicTask {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceValidationManager.class);
+
+ private List<InstanceConfig> _instanceConfigs;
+
+ public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager) {
+ super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
+ pinotHelixResourceManager);
+ }
+
+ @Override
+ protected void preprocess() {
+ // Cache instance configs to reduce ZK access
+ _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+ }
+
+ @Override
+ protected void processTable(String tableNameWithType) {
+ try {
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping broker resource validation", tableNameWithType);
+ return;
+ }
+
+ // Rebuild broker resource
+ Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+ tableConfig.getTenantConfig().getBroker());
+ _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while validating broker resource for table: {}", tableNameWithType, e);
+ }
+ }
+
+
+ @Override
+ protected void postprocess() {
+
+ }
+
+ @Override
+ protected void initTask() {
+
+ }
+
+ @Override
+ public void stopTask() {
+ }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
similarity index 64%
rename from pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
rename to pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 7d06289..f35b287 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -16,29 +16,20 @@
package com.linkedin.pinot.controller.validation;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.linkedin.pinot.common.config.SegmentsValidationAndRetentionConfig;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import com.linkedin.pinot.common.metrics.ValidationMetrics;
import com.linkedin.pinot.common.utils.CommonConstants;
-import com.linkedin.pinot.common.utils.HLCSegmentName;
-import com.linkedin.pinot.common.utils.SegmentName;
import com.linkedin.pinot.common.utils.time.TimeUtils;
import com.linkedin.pinot.controller.ControllerConf;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import com.linkedin.pinot.core.realtime.stream.StreamConfig;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.model.InstanceConfig;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.slf4j.Logger;
@@ -49,53 +40,26 @@ import org.slf4j.LoggerFactory;
* Manages the segment validation metrics, to ensure that all offline segments are contiguous (no missing segments) and
* that the offline push delay isn't too high.
*/
-public class ValidationManager extends ControllerPeriodicTask {
- private static final Logger LOGGER = LoggerFactory.getLogger(ValidationManager.class);
+public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class);
- private final int _segmentLevelValidationIntervalInSeconds;
- private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
- private final ValidationMetrics _validationMetrics;
+ protected final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+ protected final ValidationMetrics _validationMetrics;
- private long _lastSegmentLevelValidationTimeMs = 0L;
- private boolean _runSegmentLevelValidation;
- private List<InstanceConfig> _instanceConfigs;
-
- public ValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
+ public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
- super("ValidationManager", config.getValidationControllerFrequencyInSeconds(), pinotHelixResourceManager);
- _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds();
- Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
+ super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
+ pinotHelixResourceManager);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
_validationMetrics = validationMetrics;
}
@Override
protected void preprocess() {
- // Run segment level validation using a separate interval
- _runSegmentLevelValidation = false;
- long currentTimeMs = System.currentTimeMillis();
- if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationTimeMs)
- >= _segmentLevelValidationIntervalInSeconds) {
- LOGGER.info("Run segment-level validation");
- _runSegmentLevelValidation = true;
- _lastSegmentLevelValidationTimeMs = currentTimeMs;
- }
-
- // Cache instance configs to reduce ZK access
- _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
}
@Override
protected void processTable(String tableNameWithType) {
- runValidation(tableNameWithType);
- }
-
- @Override
- protected void postprocess() {
-
- }
-
- private void runValidation(String tableNameWithType) {
try {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig == null) {
@@ -103,29 +67,12 @@ public class ValidationManager extends ControllerPeriodicTask {
return;
}
- // Rebuild broker resource
- Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
- tableConfig.getTenantConfig().getBroker());
- _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
-
- // Perform validation based on the table type
CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
- if (_runSegmentLevelValidation) {
- validateOfflineSegmentPush(tableConfig);
- }
- } else {
- if (_runSegmentLevelValidation) {
- updateRealtimeDocumentCount(tableConfig);
- }
- Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
- StreamConfig streamConfig = new StreamConfig(streamConfigMap);
- if (streamConfig.hasLowLevelConsumerType()) {
- _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
- }
+ validateOfflineSegmentPush(tableConfig);
}
} catch (Exception e) {
- LOGGER.warn("Caught exception while validating table: {}", tableNameWithType, e);
+ LOGGER.warn("Caught exception while checking offline segment intervals for table: {}", tableNameWithType, e);
}
}
@@ -261,50 +208,9 @@ public class ValidationManager extends ControllerPeriodicTask {
return numTotalDocs;
}
- private void updateRealtimeDocumentCount(TableConfig tableConfig) {
- String realtimeTableName = tableConfig.getTableName();
- List<RealtimeSegmentZKMetadata> metadataList =
- _pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
- boolean countHLCSegments = true; // false if this table has ONLY LLC segments (i.e. fully migrated)
- StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
- if (streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType()) {
- countHLCSegments = false;
- }
- // Update the gauge to contain the total document count in the segments
- _validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
- computeRealtimeTotalDocumentInSegments(metadataList, countHLCSegments));
- }
-
- @VisibleForTesting
- static long computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList,
- boolean countHLCSegments) {
- long numTotalDocs = 0;
-
- String groupId = "";
- for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : realtimeSegmentZKMetadataList) {
- String segmentName = realtimeSegmentZKMetadata.getSegmentName();
- if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
- if (countHLCSegments) {
- HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
- String segmentGroupIdName = hlcSegmentName.getGroupId();
-
- if (groupId.isEmpty()) {
- groupId = segmentGroupIdName;
- }
- // Discard all segments with different groupids as they are replicas
- if (groupId.equals(segmentGroupIdName) && realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
- numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
- }
- }
- } else {
- // Low level segments
- if (!countHLCSegments) {
- numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
- }
- }
- }
+ @Override
+ protected void postprocess() {
- return numTotalDocs;
}
@Override
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
new file mode 100644
index 0000000..71e50d2
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -0,0 +1,162 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import com.linkedin.pinot.common.metrics.ValidationMetrics;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.common.utils.HLCSegmentName;
+import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Validates realtime ideal states and segment metadata, fixing any partitions which have stopped consuming
+ */
+public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
+
+ protected final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+ protected final ValidationMetrics _validationMetrics;
+
+ private final int _segmentLevelValidationIntervalInSeconds;
+ private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
+ private boolean _updateRealtimeDocumentCount;
+
+ public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
+ PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
+ super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
+ pinotHelixResourceManager);
+ _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+ _validationMetrics = validationMetrics;
+
+ _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds();
+ Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
+ }
+
+ @Override
+ protected void preprocess() {
+ // Update realtime document counts only if certain time has passed after previous run
+ _updateRealtimeDocumentCount = false;
+ long currentTimeMs = System.currentTimeMillis();
+ if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastUpdateRealtimeDocumentCountTimeMs)
+ >= _segmentLevelValidationIntervalInSeconds) {
+ LOGGER.info("Run segment-level validation");
+ _updateRealtimeDocumentCount = true;
+ _lastUpdateRealtimeDocumentCountTimeMs = currentTimeMs;
+ }
+ }
+
+ @Override
+ protected void processTable(String tableNameWithType) {
+ try {
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+ return;
+ }
+
+ CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+ if (_updateRealtimeDocumentCount) {
+ updateRealtimeDocumentCount(tableConfig);
+ }
+ }
+ Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+ if (streamConfig.hasLowLevelConsumerType()) {
+ _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while validating realtime table: {}", tableNameWithType, e);
+ }
+ }
+
+ private void updateRealtimeDocumentCount(TableConfig tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
+ List<RealtimeSegmentZKMetadata> metadataList =
+ _pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
+ boolean countHLCSegments = true; // false if this table has ONLY LLC segments (i.e. fully migrated)
+ StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
+ if (streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType()) {
+ countHLCSegments = false;
+ }
+ // Update the gauge to contain the total document count in the segments
+ _validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
+ computeRealtimeTotalDocumentInSegments(metadataList, countHLCSegments));
+ }
+
+ @VisibleForTesting
+ static long computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList,
+ boolean countHLCSegments) {
+ long numTotalDocs = 0;
+
+ String groupId = "";
+ for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : realtimeSegmentZKMetadataList) {
+ String segmentName = realtimeSegmentZKMetadata.getSegmentName();
+ if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
+ if (countHLCSegments) {
+ HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
+ String segmentGroupIdName = hlcSegmentName.getGroupId();
+
+ if (groupId.isEmpty()) {
+ groupId = segmentGroupIdName;
+ }
+ // Discard all segments with different groupids as they are replicas
+ if (groupId.equals(segmentGroupIdName) && realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
+ numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+ }
+ }
+ } else {
+ // Low level segments
+ if (!countHLCSegments) {
+ numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+ }
+ }
+ }
+
+ return numTotalDocs;
+ }
+
+ @Override
+ protected void postprocess() {
+
+ }
+
+ @Override
+ protected void initTask() {
+
+ }
+
+ @Override
+ public void stopTask() {
+ LOGGER.info("Unregister all the validation metrics.");
+ _validationMetrics.unregisterAllMetrics();
+ }
+}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index c27b0ef..6f384c7 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -23,6 +23,7 @@ import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import com.linkedin.pinot.common.segment.SegmentMetadata;
import com.linkedin.pinot.common.utils.CommonConstants;
import com.linkedin.pinot.common.utils.LLCSegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import com.linkedin.pinot.controller.helix.core.SegmentDeletionManager;
@@ -84,7 +85,10 @@ public class RetentionManagerTest {
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
- RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
+ ControllerConf conf = new ControllerConf();
+ conf.setRetentionControllerFrequencyInSeconds(0);
+ conf.setDeletedSegmentsRetentionInDays(0);
+ RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
retentionManager.init();
retentionManager.run();
@@ -201,7 +205,10 @@ public class RetentionManagerTest {
setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
- RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
+ ControllerConf conf = new ControllerConf();
+ conf.setRetentionControllerFrequencyInSeconds(0);
+ conf.setDeletedSegmentsRetentionInDays(0);
+ RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
retentionManager.init();
retentionManager.run();
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
index 6d5394a..861623f 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
@@ -49,7 +49,7 @@ import static org.testng.Assert.*;
/**
- * Tests for the ValidationManager.
+ * Tests for the ValidationManagers.
*/
public class ValidationManagerTest {
private String HELIX_CLUSTER_NAME = "TestValidationManager";
@@ -175,7 +175,7 @@ public class ValidationManagerTest {
segmentZKMetadataList.add(
SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME, segmentName4, 20));
- assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, true), 60);
+ assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, true), 60);
// Now add some low level segment names
String segmentName5 = new LLCSegmentName(TEST_TABLE_NAME, 1, 0, 1000).getSegmentName();
@@ -185,7 +185,7 @@ public class ValidationManagerTest {
segmentZKMetadataList.add(SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME, segmentName6, 5));
// Only the LLC segments should get counted.
- assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, false), 15);
+ assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, false), 15);
}
@AfterClass
@@ -207,18 +207,18 @@ public class ValidationManagerTest {
jan1st2nd3rd.add(jan1st);
jan1st2nd3rd.add(jan2nd);
jan1st2nd3rd.add(jan3rd);
- assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd, Duration.standardDays(1)), 0);
+ assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd, Duration.standardDays(1)), 0);
ArrayList<Interval> jan1st2nd3rd5th = new ArrayList<>(jan1st2nd3rd);
jan1st2nd3rd5th.add(jan5th);
- assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd5th, Duration.standardDays(1)), 1);
+ assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd5th, Duration.standardDays(1)), 1);
// Should also work if the intervals are in random order
ArrayList<Interval> jan5th2nd1st = new ArrayList<>();
jan5th2nd1st.add(jan5th);
jan5th2nd1st.add(jan2nd);
jan5th2nd1st.add(jan1st);
- assertEquals(ValidationManager.computeNumMissingSegments(jan5th2nd1st, Duration.standardDays(1)), 2);
+ assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan5th2nd1st, Duration.standardDays(1)), 2);
// Should also work if the intervals are of different sizes
Interval jan1stAnd2nd = new Interval(new DateTime(2015, 1, 1, 0, 0, 0), new DateTime(2015, 1, 2, 23, 59, 59));
@@ -226,6 +226,6 @@ public class ValidationManagerTest {
jan1st2nd4th5th.add(jan1stAnd2nd);
jan1st2nd4th5th.add(jan4th);
jan1st2nd4th5th.add(jan5th);
- assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd4th5th, Duration.standardDays(1)), 1);
+ assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd4th5th, Duration.standardDays(1)), 1);
}
}
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index e7fb540..ac4aad0 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -25,7 +25,7 @@ import com.linkedin.pinot.common.utils.LLCSegmentName;
import com.linkedin.pinot.common.utils.NetUtil;
import com.linkedin.pinot.common.utils.ZkStarter;
import com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
import com.linkedin.pinot.server.realtime.ControllerLeaderLocator;
import com.linkedin.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import com.linkedin.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
@@ -152,7 +152,7 @@ public class SegmentCompletionIntegrationTests extends LLCRealtimeClusterIntegra
final String oldSegment = _currentSegment;
// Now call the validation manager, and the segment should fix itself
- ValidationManager validationManager = _controllerStarter.getValidationManager();
+ RealtimeSegmentValidationManager validationManager = _controllerStarter.getRealtimeSegmentValidationManager();
validationManager.init();
validationManager.run();
diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
index e27e71a..3da214f 100644
--- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
+++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
@@ -147,7 +147,9 @@ public class StartControllerCommand extends AbstractBaseAdminCommand implements
conf.setTenantIsolationEnabled(_tenantIsolation);
conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
- conf.setValidationControllerFrequencyInSeconds(3600);
+ conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+ conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+ conf.setBrokerResourceValidationFrequencyInSeconds(3600);
}
LOGGER.info("Executing command: " + toString());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org