You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/01/08 23:23:28 UTC
[incubator-pinot] 05/08: Use controller validation frequency as
default values for new periodic validation tasks
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch split_vm_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 75259146d0c219e2372abdb6d18fe759cc1a49e4
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jan 7 18:31:25 2019 -0800
Use controller validation frequency as default values for new periodic validation tasks
---
.../linkedin/pinot/controller/ControllerConf.java | 45 +++++++++++-----------
.../realtime/PinotLLCRealtimeSegmentManager.java | 6 +--
.../RealtimeSegmentValidationManager.java | 2 +-
.../PinotLLCRealtimeSegmentManagerTest.java | 26 ++++++-------
4 files changed, 40 insertions(+), 39 deletions(-)
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index 9f84691..c6ff82e 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -57,7 +57,9 @@ public class ControllerConf extends PropertiesConfiguration {
public static class ControllerPeriodicTasksConf {
private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
- private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = "controller.validation.frequencyInSeconds";
+ @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings
+ private static final String DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS =
+ "controller.validation.frequencyInSeconds";
private static final String OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS =
"controller.offline.segment.interval.checker.frequencyInSeconds";
private static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
@@ -76,7 +78,8 @@ public class ControllerConf extends PropertiesConfiguration {
"controller.segment.level.validation.intervalInSeconds";
private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
- private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+ @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings
+ private static final int DEPRECATED_DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
@@ -353,34 +356,26 @@ public class ControllerConf extends PropertiesConfiguration {
Integer.toString(retentionFrequencyInSeconds));
}
- /**
- * Deprecated. The validation manager has been split into 3 separate tasks, each having their own frequency config
- * @return
- */
- @Deprecated
- public int getValidationControllerFrequencyInSeconds() {
- if (containsKey(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
+ private int getValidationControllerFrequencyInSeconds() {
+ if (containsKey(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
return Integer.parseInt(
- (String) getProperty(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
+ (String) getProperty(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
}
- return ControllerPeriodicTasksConf.DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
+ return ControllerPeriodicTasksConf.DEPRECATED_DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
}
/**
- * Deprecated. The validation manager has been split into 3 separate tasks, each having their own frequency config
+ * Returns the config value for controller.offline.segment.interval.checker.frequencyInSeconds if it exists.
+ * If it doesn't exist, returns the segment level validation interval. This is done in order to retain the current behavior,
+ * wherein the offline validation tasks were done at segment level validation interval frequency
* @return
*/
- public void setValidationControllerFrequencyInSeconds(int validationFrequencyInSeconds) {
- setProperty(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
- Integer.toString(validationFrequencyInSeconds));
- }
-
public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
if (containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)) {
return Integer.parseInt(
(String) getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS));
}
- return ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS;
+ return getSegmentLevelValidationIntervalInSeconds();
}
public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int validationFrequencyInSeconds) {
@@ -388,13 +383,18 @@ public class ControllerConf extends PropertiesConfiguration {
Integer.toString(validationFrequencyInSeconds));
}
+ /**
+ * Returns the config value for controller.realtime.segment.validation.frequencyInSeconds if it exists.
+ * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior,
+ * wherein the realtime validation tasks were done at validation controller frequency
+ * @return
+ */
public int getRealtimeSegmentValidationFrequencyInSeconds() {
-
if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS)) {
return Integer.parseInt(
(String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS));
}
- return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS;
+ return getValidationControllerFrequencyInSeconds();
}
public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
@@ -403,8 +403,9 @@ public class ControllerConf extends PropertiesConfiguration {
}
/**
- * Return broker resource validation frequency if present, else return the validation manager frequency
- * This is so that we can rollout with no config changes to the frequency of this task
+ * Returns the config value for controller.broker.resource.validation.frequencyInSeconds if it exists.
+ * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior,
+ * wherin the broker resource validation tasks were done at validation controller frequency
* @return
*/
public int getBrokerResourceValidationFrequencyInSeconds() {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e708abc..a77750e 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -950,7 +950,7 @@ public class PinotLLCRealtimeSegmentManager {
*
* TODO: We need to find a place to detect and update a gauge for nonConsumingPartitionsCount for a table, and reset it to 0 at the end of validateLLC
*/
- public void validateLLCSegments(final TableConfig tableConfig) {
+ public void ensureAllPartitionsConsuming(final TableConfig tableConfig) {
final String tableNameWithType = tableConfig.getTableName();
final StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
final int partitionCount = getPartitionCount(streamConfig);
@@ -958,7 +958,7 @@ public class PinotLLCRealtimeSegmentManager {
@Nullable
@Override
public IdealState apply(@Nullable IdealState idealState) {
- return validateLLCSegments(tableConfig, idealState, partitionCount);
+ return ensureAllPartitionsConsuming(tableConfig, idealState, partitionCount);
}
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
}
@@ -1078,7 +1078,7 @@ public class PinotLLCRealtimeSegmentManager {
* TODO: split this method into multiple smaller methods
*/
@VisibleForTesting
- protected IdealState validateLLCSegments(final TableConfig tableConfig, IdealState idealState,
+ protected IdealState ensureAllPartitionsConsuming(final TableConfig tableConfig, IdealState idealState,
final int partitionCount) {
final String tableNameWithType = tableConfig.getTableName();
final StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 36d7b27..77ed13a 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -92,7 +92,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
StreamConfig streamConfig = new StreamConfig(streamConfigMap);
if (streamConfig.hasLowLevelConsumerType()) {
- _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
+ _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
}
}
} catch (Exception e) {
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 36f3386..fd417a5 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -369,7 +369,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
oldMetadataMap.put(entry.getKey(), new LLCRealtimeSegmentZKMetadata(entry.getValue().toZNRecord()));
}
segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
- IdealState updatedIdealState = segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ IdealState updatedIdealState = segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
Map<String, Map<String, String>> updatedMapFields = updatedIdealState.getRecord().getMapFields();
Map<String, LLCRealtimeSegmentZKMetadata> updatedMetadataMap = segmentManager._metadataMap;
@@ -525,13 +525,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
if (tooSoonToCorrect) {
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
// validate that all entries in oldMapFields are unchanged in new ideal state
verifyNoChangeToOldEntries(oldMapFields, idealState);
segmentManager.tooSoonToCorrect = false;
}
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
// verify that new segment gets created in ideal state with CONSUMING
Assert.assertNotNull(idealState.getRecord().getMapFields().get(llcSegmentName.getSegmentName()));
@@ -572,13 +572,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
if (tooSoonToCorrect) {
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
// validate nothing changed and try again with disabled
verifyNoChangeToOldEntries(oldMapFields, idealState);
segmentManager.tooSoonToCorrect = false;
}
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
// verify that new segment gets created in ideal state with CONSUMING and old segment ONLINE
Assert.assertNotNull(idealState.getRecord().getMapFields().get(latestSegment.getSegmentName()).values().
@@ -615,7 +615,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
(ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
} else {
@@ -644,12 +644,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
if (tooSoonToCorrect) {
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
// validate nothing changed and try again with disabled
verifyNoChangeToOldEntries(oldMapFields, idealState);
segmentManager.tooSoonToCorrect = false;
}
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
} else {
@@ -677,13 +677,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
if (tooSoonToCorrect) {
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
// validate nothing changed and try again with disabled
verifyNoChangeToOldEntries(oldMapFields, idealState);
segmentManager.tooSoonToCorrect = false;
}
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
} else {
@@ -695,7 +695,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
(ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
// verify that nothing changed
verifyNoChangeToOldEntries(oldMapFields, idealState);
@@ -830,7 +830,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
IdealState idealState = idealStateBuilder.clear().build();
segmentManager._metadataMap.clear();
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
PartitionAssignment partitionAssignment =
segmentManager._partitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig,
idealState);
@@ -845,7 +845,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
FakePinotLLCRealtimeSegmentManager segmentManager, TableConfig tableConfig, int nPartitions) {
IdealState idealState = idealStateBuilder.clear().build();
segmentManager._metadataMap.clear();
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
return idealStateBuilder.build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org