You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/01/08 23:23:28 UTC

[incubator-pinot] 05/08: Use controller validation frequency as default values for new periodic validation tasks

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch split_vm_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 75259146d0c219e2372abdb6d18fe759cc1a49e4
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jan 7 18:31:25 2019 -0800

    Use controller validation frequency as default values for new periodic validation tasks
---
 .../linkedin/pinot/controller/ControllerConf.java  | 45 +++++++++++-----------
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  6 +--
 .../RealtimeSegmentValidationManager.java          |  2 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        | 26 ++++++-------
 4 files changed, 40 insertions(+), 39 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index 9f84691..c6ff82e 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -57,7 +57,9 @@ public class ControllerConf extends PropertiesConfiguration {
 
   public static class ControllerPeriodicTasksConf {
     private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
-    private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = "controller.validation.frequencyInSeconds";
+    @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings
+    private static final String DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS =
+        "controller.validation.frequencyInSeconds";
     private static final String OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS =
         "controller.offline.segment.interval.checker.frequencyInSeconds";
     private static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
@@ -76,7 +78,8 @@ public class ControllerConf extends PropertiesConfiguration {
         "controller.segment.level.validation.intervalInSeconds";
 
     private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
-    private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings
+    private static final int DEPRECATED_DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
     private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
@@ -353,34 +356,26 @@ public class ControllerConf extends PropertiesConfiguration {
         Integer.toString(retentionFrequencyInSeconds));
   }
 
-  /**
-   * Deprecated. The validation manager has been split into 3 separate tasks, each having their own frequency config
-   * @return
-   */
-  @Deprecated
-  public int getValidationControllerFrequencyInSeconds() {
-    if (containsKey(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
+  private int getValidationControllerFrequencyInSeconds() {
+    if (containsKey(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
       return Integer.parseInt(
-          (String) getProperty(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
+          (String) getProperty(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
     }
-    return ControllerPeriodicTasksConf.DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEPRECATED_DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
   }
 
   /**
-   * Deprecated. The validation manager has been split into 3 separate tasks, each having their own frequency config
+   * Returns the config value for controller.offline.segment.interval.checker.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the segment level validation interval. This is done in order to retain the current behavior,
+   * wherein the offline validation tasks were done at segment level validation interval frequency
    * @return
    */
-  public void setValidationControllerFrequencyInSeconds(int validationFrequencyInSeconds) {
-    setProperty(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
-        Integer.toString(validationFrequencyInSeconds));
-  }
-
   public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
     if (containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)) {
       return Integer.parseInt(
           (String) getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS));
     }
-    return ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS;
+    return getSegmentLevelValidationIntervalInSeconds();
   }
 
   public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int validationFrequencyInSeconds) {
@@ -388,13 +383,18 @@ public class ControllerConf extends PropertiesConfiguration {
         Integer.toString(validationFrequencyInSeconds));
   }
 
+  /**
+   * Returns the config value for controller.realtime.segment.validation.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior,
+   * wherein the realtime validation tasks were done at validation controller frequency
+   * @return
+   */
   public int getRealtimeSegmentValidationFrequencyInSeconds() {
-
     if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS)) {
       return Integer.parseInt(
           (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS));
     }
-    return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS;
+    return getValidationControllerFrequencyInSeconds();
   }
 
   public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
@@ -403,8 +403,9 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   /**
-   * Return broker resource validation frequency if present, else return the validation manager frequency
-   * This is so that we can rollout with no config changes to the frequency of this task
+   * Returns the config value for  controller.broker.resource.validation.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior,
+   * wherin the broker resource validation tasks were done at validation controller frequency
    * @return
    */
   public int getBrokerResourceValidationFrequencyInSeconds() {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e708abc..a77750e 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -950,7 +950,7 @@ public class PinotLLCRealtimeSegmentManager {
    *
    * TODO: We need to find a place to detect and update a gauge for nonConsumingPartitionsCount for a table, and reset it to 0 at the end of validateLLC
    */
-  public void validateLLCSegments(final TableConfig tableConfig) {
+  public void ensureAllPartitionsConsuming(final TableConfig tableConfig) {
     final String tableNameWithType = tableConfig.getTableName();
     final StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
     final int partitionCount = getPartitionCount(streamConfig);
@@ -958,7 +958,7 @@ public class PinotLLCRealtimeSegmentManager {
       @Nullable
       @Override
       public IdealState apply(@Nullable IdealState idealState) {
-        return validateLLCSegments(tableConfig, idealState, partitionCount);
+        return ensureAllPartitionsConsuming(tableConfig, idealState, partitionCount);
       }
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
   }
@@ -1078,7 +1078,7 @@ public class PinotLLCRealtimeSegmentManager {
    * TODO: split this method into multiple smaller methods
    */
   @VisibleForTesting
-  protected IdealState validateLLCSegments(final TableConfig tableConfig, IdealState idealState,
+  protected IdealState ensureAllPartitionsConsuming(final TableConfig tableConfig, IdealState idealState,
       final int partitionCount) {
     final String tableNameWithType = tableConfig.getTableName();
     final StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 36d7b27..77ed13a 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -92,7 +92,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
         Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
         StreamConfig streamConfig = new StreamConfig(streamConfigMap);
         if (streamConfig.hasLowLevelConsumerType()) {
-          _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
+          _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
         }
       }
     } catch (Exception e) {
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 36f3386..fd417a5 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -369,7 +369,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       oldMetadataMap.put(entry.getKey(), new LLCRealtimeSegmentZKMetadata(entry.getValue().toZNRecord()));
     }
     segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
-    IdealState updatedIdealState = segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    IdealState updatedIdealState = segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
     Map<String, Map<String, String>> updatedMapFields = updatedIdealState.getRecord().getMapFields();
     Map<String, LLCRealtimeSegmentZKMetadata> updatedMetadataMap = segmentManager._metadataMap;
 
@@ -525,13 +525,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
           Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
           if (tooSoonToCorrect) {
-            segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
             // validate that all entries in oldMapFields are unchanged in new ideal state
             verifyNoChangeToOldEntries(oldMapFields, idealState);
             segmentManager.tooSoonToCorrect = false;
           }
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
           // verify that new segment gets created in ideal state with CONSUMING
           Assert.assertNotNull(idealState.getRecord().getMapFields().get(llcSegmentName.getSegmentName()));
@@ -572,13 +572,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
           Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
           if (tooSoonToCorrect) {
-            segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
             // validate nothing changed and try again with disabled
             verifyNoChangeToOldEntries(oldMapFields, idealState);
             segmentManager.tooSoonToCorrect = false;
           }
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
           // verify that new segment gets created in ideal state with CONSUMING and old segment ONLINE
           Assert.assertNotNull(idealState.getRecord().getMapFields().get(latestSegment.getSegmentName()).values().
@@ -615,7 +615,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
               (ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
           Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
           verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
         } else {
@@ -644,12 +644,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
             Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
             if (tooSoonToCorrect) {
-              segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
               // validate nothing changed and try again with disabled
               verifyNoChangeToOldEntries(oldMapFields, idealState);
               segmentManager.tooSoonToCorrect = false;
             }
-            segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
             verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
           } else {
@@ -677,13 +677,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
               Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
               if (tooSoonToCorrect) {
-                segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+                segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
                 // validate nothing changed and try again with disabled
                 verifyNoChangeToOldEntries(oldMapFields, idealState);
                 segmentManager.tooSoonToCorrect = false;
               }
 
-              segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
               verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
             } else {
@@ -695,7 +695,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
                   (ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
               Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
-              segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
               // verify that nothing changed
               verifyNoChangeToOldEntries(oldMapFields, idealState);
@@ -830,7 +830,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     IdealState idealState = idealStateBuilder.clear().build();
     segmentManager._metadataMap.clear();
 
-    segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
     PartitionAssignment partitionAssignment =
         segmentManager._partitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig,
             idealState);
@@ -845,7 +845,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       FakePinotLLCRealtimeSegmentManager segmentManager, TableConfig tableConfig, int nPartitions) {
     IdealState idealState = idealStateBuilder.clear().build();
     segmentManager._metadataMap.clear();
-    segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
     return idealStateBuilder.build();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org