You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/01/25 23:01:35 UTC

[incubator-pinot] branch master updated: Add integration test for SegmentStatusChecker periodic task (#3741)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eb7a918  Add integration test for SegmentStatusChecker periodic task (#3741)
eb7a918 is described below

commit eb7a9182b297e947d13cd76a3b0622f4ef576cf1
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Jan 25 15:01:30 2019 -0800

    Add integration test for SegmentStatusChecker periodic task (#3741)
---
 .../apache/pinot/controller/ControllerConf.java    |  27 +++
 .../apache/pinot/controller/ControllerStarter.java |   5 +
 .../controller/helix/SegmentStatusChecker.java     |  33 +--
 .../helix/core/minion/PinotTaskManager.java        |   3 +-
 .../core/periodictask/ControllerPeriodicTask.java  |  13 --
 .../core/relocation/RealtimeSegmentRelocator.java  |   2 +-
 .../helix/core/retention/RetentionManager.java     |   3 +-
 .../BrokerResourceValidationManager.java           |   2 +-
 .../validation/OfflineSegmentIntervalChecker.java  |   2 +-
 .../RealtimeSegmentValidationManager.java          |   2 +-
 .../controller/helix/SegmentStatusCheckerTest.java |  45 ++--
 .../periodictask/ControllerPeriodicTaskTest.java   |  53 +++--
 .../tasks/SegmentStatusCheckerIntegrationTest.java | 259 +++++++++++++++++++++
 13 files changed, 372 insertions(+), 77 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 31f8abf..29f3f36 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -25,6 +25,7 @@ import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
@@ -56,6 +57,7 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT = "controller.upload.onlineToOfflineTimeout";
 
   public static class ControllerPeriodicTasksConf {
+    // frequency configs
     private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
     @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings
     private static final String DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS =
@@ -77,6 +79,18 @@ public class ControllerConf extends PropertiesConfiguration {
     private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
         "controller.segment.level.validation.intervalInSeconds";
 
+    // Initial delays
+    private static final String STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS = "controller.statusChecker.initialDelayInSeconds";
+
+    public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
+    public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
+
+    private static final Random RANDOM = new Random();
+    private static long getRandomInitialDelayInSeconds() {
+      return MIN_INITIAL_DELAY_IN_SECONDS + RANDOM.nextInt(MAX_INITIAL_DELAY_IN_SECONDS - MIN_INITIAL_DELAY_IN_SECONDS);
+    }
+
+    // Default values
     private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
     private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; // 24 Hours.
     private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
@@ -548,4 +562,17 @@ public class ControllerConf extends PropertiesConfiguration {
     return getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
         ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
   }
+
+  public long getStatusCheckerInitialDelayInSeconds() {
+    return getLong(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS,
+        ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+  }
+
+  public void setStatusCheckerInitialDelayInSeconds(long initialDelayInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS, initialDelayInSeconds);
+  }
+
+  public long getPeriodicTaskInitialDelayInSeconds() {
+    return ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds();
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 5d1546e..ab0a7ad 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -352,6 +352,11 @@ public class ControllerStarter {
     return _metricsRegistry;
   }
 
+  @VisibleForTesting
+  public ControllerMetrics getControllerMetrics() {
+    return _controllerMetrics;
+  }
+
   public static ControllerStarter startDefault() {
     return startDefault(null);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 64d8aac..3cb1ee4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -64,7 +64,8 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
    */
   public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
       ControllerMetrics metricsRegistry) {
-    super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(), pinotHelixResourceManager);
+    super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
+        config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager);
 
     _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
     _metricsRegistry = metricsRegistry;
@@ -118,19 +119,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
       } else {
         _realTimeTableCount++;
       }
+
       IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
-      if ((idealState == null) || (idealState.getPartitionSet().isEmpty())) {
-        int nReplicasFromIdealState = 1;
-        try {
-          if (idealState != null) {
-            nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
-          }
-        } catch (NumberFormatException e) {
-          // Ignore
-        }
-        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
-        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
-        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+
+      if (idealState == null) {
+        LOGGER.warn("Table {} has null ideal state. Skipping segment status checks", tableNameWithType);
+        resetTableMetrics(tableNameWithType);
         return;
       }
 
@@ -143,6 +137,19 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
         return;
       }
 
+      if (idealState.getPartitionSet().isEmpty()) {
+        int nReplicasFromIdealState = 1;
+        try {
+          nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
+        } catch (NumberFormatException e) {
+          // Ignore
+        }
+        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
+        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
+        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+        return;
+      }
+
       _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
           idealState.toString().length());
       _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 0d15c80..2052f77 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -60,7 +60,8 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager helixTaskResourceManager,
       @Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull ControllerConf controllerConf,
       @Nonnull ControllerMetrics controllerMetrics) {
-    super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(), helixResourceManager);
+    super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
+        controllerConf.getPeriodicTaskInitialDelayInSeconds(), helixResourceManager);
     _helixTaskResourceManager = helixTaskResourceManager;
     _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 9416c1c..40cd982 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -20,7 +20,6 @@ package org.apache.pinot.controller.helix.core.periodictask;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
-import java.util.Random;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.BasePeriodicTask;
 import org.slf4j.Logger;
@@ -33,10 +32,7 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class ControllerPeriodicTask extends BasePeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class);
-  private static final Random RANDOM = new Random();
 
-  public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
-  public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
 
   private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
 
@@ -51,15 +47,6 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
     _pinotHelixResourceManager = pinotHelixResourceManager;
   }
 
-  public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
-      PinotHelixResourceManager pinotHelixResourceManager) {
-    this(taskName, runFrequencyInSeconds, getRandomInitialDelayInSeconds(), pinotHelixResourceManager);
-  }
-
-  private static long getRandomInitialDelayInSeconds() {
-    return MIN_INITIAL_DELAY_IN_SECONDS + RANDOM.nextInt(MAX_INITIAL_DELAY_IN_SECONDS - MIN_INITIAL_DELAY_IN_SECONDS);
-  }
-
   /**
    * Reset flags, and call initTask which initializes each individual task
    */
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index 6129669..a5a81de 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -56,7 +56,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
 
   public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
     super("RealtimeSegmentRelocator", getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()),
-        pinotHelixResourceManager);
+        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index d9aa1d9..25a2db8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -54,7 +54,8 @@ public class RetentionManager extends ControllerPeriodicTask {
   private final int _deletedSegmentsRetentionInDays;
 
   public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
-    super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(), pinotHelixResourceManager);
+    super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(),
+        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
     _deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
 
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 03bfd79..c969ab1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -39,7 +39,7 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask {
 
   public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager) {
     super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
-        pinotHelixResourceManager);
+        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 41bb950..14b9827 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -50,7 +50,7 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
   public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
       ValidationMetrics validationMetrics) {
     super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
-        pinotHelixResourceManager);
+        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
     _validationMetrics = validationMetrics;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 94aac5c..245437f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -55,7 +55,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
   public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
       PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
     super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
-        pinotHelixResourceManager);
+        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index db33605..c3cddaaf 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix;
 
+import com.google.common.collect.Lists;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
 import java.util.List;
@@ -266,7 +267,7 @@ public class SegmentStatusCheckerTest {
   @Test
   public void missingIdealTest() throws Exception {
     final String tableName = "myTable_REALTIME";
-    List<String> allTableNames = new ArrayList<String>();
+    List<String> allTableNames = new ArrayList<>();
     allTableNames.add(tableName);
 
     {
@@ -288,9 +289,9 @@ public class SegmentStatusCheckerTest {
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
         ControllerGauge.SEGMENTS_IN_ERROR_STATE), Long.MIN_VALUE);
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
-        ControllerGauge.NUMBER_OF_REPLICAS), 1);
+        ControllerGauge.NUMBER_OF_REPLICAS), Long.MIN_VALUE);
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
-        ControllerGauge.PERCENT_OF_REPLICAS), 100);
+        ControllerGauge.PERCENT_OF_REPLICAS), Long.MIN_VALUE);
   }
 
   @Test
@@ -391,11 +392,20 @@ public class SegmentStatusCheckerTest {
   }
 
   @Test
-  public void noIdealState() throws Exception {
-    final String tableName = "myTable_REALTIME";
+  public void disabledTableTest() throws Exception {
+
+    final String tableName = "myTable_OFFLINE";
     List<String> allTableNames = new ArrayList<String>();
     allTableNames.add(tableName);
-    IdealState idealState = null;
+    IdealState idealState = new IdealState(tableName);
+    // disable table in idealstate
+    idealState.enable(false);
+    idealState.setPartitionState("myTable_OFFLINE", "pinot1", "OFFLINE");
+    idealState.setPartitionState("myTable_OFFLINE", "pinot2", "OFFLINE");
+    idealState.setPartitionState("myTable_OFFLINE", "pinot3", "OFFLINE");
+    idealState.setReplicas("1");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
@@ -410,30 +420,25 @@ public class SegmentStatusCheckerTest {
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    // verify state before test
+    Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
+        ControllerGauge.DISABLED_TABLE_COUNT), 0);
+    // update metrics
     segmentStatusChecker.init();
     segmentStatusChecker.run();
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
-        ControllerGauge.SEGMENTS_IN_ERROR_STATE), Long.MIN_VALUE);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
-        ControllerGauge.NUMBER_OF_REPLICAS), 1);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
-        ControllerGauge.PERCENT_OF_REPLICAS), 100);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
-        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+    Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
+        ControllerGauge.DISABLED_TABLE_COUNT), 1);
   }
 
+
   @Test
-  public void disabledTableTest() throws Exception {
+  public void disabledEmptyTableTest() throws Exception {
 
     final String tableName = "myTable_OFFLINE";
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(tableName);
+    List<String> allTableNames = Lists.newArrayList(tableName);
     IdealState idealState = new IdealState(tableName);
     // disable table in idealstate
     idealState.enable(false);
-    idealState.setPartitionState("myTable_OFFLINE", "pinot1", "OFFLINE");
-    idealState.setPartitionState("myTable_OFFLINE", "pinot2", "OFFLINE");
-    idealState.setPartitionState("myTable_OFFLINE", "pinot3", "OFFLINE");
     idealState.setReplicas("1");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index c759b9e..29a96de 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
+import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
@@ -36,6 +37,7 @@ import static org.testng.Assert.assertTrue;
 
 public class ControllerPeriodicTaskTest {
   private static final long RUN_FREQUENCY_IN_SECONDS = 30;
+  private final ControllerConf _controllerConf = new ControllerConf();
 
   private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
   private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
@@ -44,31 +46,30 @@ public class ControllerPeriodicTaskTest {
   private final AtomicInteger _numTablesProcessed = new AtomicInteger();
   private final int _numTables = 3;
 
-  private final MockControllerPeriodicTask _task =
-      new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) {
+  private final MockControllerPeriodicTask _task = new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS,
+      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager) {
 
-        @Override
-        protected void initTask() {
-          _initTaskCalled.set(true);
-        }
-
-        @Override
-        public void stopTask() {
-          _stopTaskCalled.set(true);
-        }
+    @Override
+    protected void initTask() {
+      _initTaskCalled.set(true);
+    }
 
-        @Override
-        public void process(List<String> tableNamesWithType) {
-          _processCalled.set(true);
-          super.process(tableNamesWithType);
-        }
+    @Override
+    public void stopTask() {
+      _stopTaskCalled.set(true);
+    }
 
-        @Override
-        public void processTable(String tableNameWithType) {
-          _numTablesProcessed.getAndIncrement();
-        }
+    @Override
+    public void process(List<String> tableNamesWithType) {
+      _processCalled.set(true);
+      super.process(tableNamesWithType);
+    }
 
-      };
+    @Override
+    public void processTable(String tableNameWithType) {
+      _numTablesProcessed.getAndIncrement();
+    }
+  };
 
   @BeforeTest
   public void beforeTest() {
@@ -86,8 +87,10 @@ public class ControllerPeriodicTaskTest {
 
   @Test
   public void testRandomInitialDelay() {
-    assertTrue(_task.getInitialDelayInSeconds() >= ControllerPeriodicTask.MIN_INITIAL_DELAY_IN_SECONDS);
-    assertTrue(_task.getInitialDelayInSeconds() < ControllerPeriodicTask.MAX_INITIAL_DELAY_IN_SECONDS);
+    assertTrue(
+        _task.getInitialDelayInSeconds() >= ControllerConf.ControllerPeriodicTasksConf.MIN_INITIAL_DELAY_IN_SECONDS);
+    assertTrue(
+        _task.getInitialDelayInSeconds() < ControllerConf.ControllerPeriodicTasksConf.MAX_INITIAL_DELAY_IN_SECONDS);
 
     assertEquals(_task.getIntervalInSeconds(), RUN_FREQUENCY_IN_SECONDS);
   }
@@ -134,9 +137,9 @@ public class ControllerPeriodicTaskTest {
 
   private class MockControllerPeriodicTask extends ControllerPeriodicTask {
 
-    public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
+    public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
         PinotHelixResourceManager pinotHelixResourceManager) {
-      super(taskName, runFrequencyInSeconds, pinotHelixResourceManager);
+      super(taskName, runFrequencyInSeconds, initialDelayInSeconds, pinotHelixResourceManager);
     }
 
     @Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
new file mode 100644
index 0000000..deefc4a
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.controller.periodic.tasks;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.KafkaStarterUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.common.utils.retry.RetryPolicies;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test to check {@link org.apache.pinot.controller.helix.SegmentStatusChecker} is
+ * running and verify the metrics emitted
+ */
+public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationTestSet {
+
+  private String emptyTable = "table1_OFFLINE";
+  private String disabledOfflineTable = "table2_OFFLINE";
+  private String basicOfflineTable = "table3_OFFLINE";
+  private String errorOfflineTable = "table4_OFFLINE";
+  private String realtimeTableErrorState = "table5_REALTIME";
+
+  private static final int SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS = 60;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    startZk();
+
+    // Set initial delay of 60 seconds for the segment status checker, to allow time for tables setup.
+    // By default, it will pick a random delay between 120s and 300s
+    ControllerConf controllerConf = getDefaultControllerConfiguration();
+    controllerConf.setStatusCheckerInitialDelayInSeconds(SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS);
+
+    startController(controllerConf);
+
+    startBroker();
+
+    startServers(3);
+
+    // empty table
+    setupOfflineTable(emptyTable);
+
+    // table with disabled ideal state
+    setupOfflineTable(disabledOfflineTable);
+    _helixAdmin.enableResource(_clusterName, disabledOfflineTable, false);
+
+    // happy case table
+    setupOfflineTableAndSegments(basicOfflineTable);
+
+    // some segments offline
+    setupOfflineTableAndSegments(errorOfflineTable);
+    HelixHelper.updateIdealState(_helixManager, errorOfflineTable, new Function<IdealState, IdealState>() {
+      @Nullable
+      @Override
+      public IdealState apply(@Nullable IdealState input) {
+        List<String> segmentNames = Lists.newArrayList(input.getPartitionSet());
+        Collections.sort(segmentNames);
+
+        Map<String, String> instanceStateMap1 = input.getInstanceStateMap(segmentNames.get(0));
+        for (String instance : instanceStateMap1.keySet()) {
+          instanceStateMap1.put(instance, "OFFLINE");
+          break;
+        }
+        return input;
+      }
+    }, RetryPolicies.fixedDelayRetryPolicy(2, 10));
+
+    // realtime table with segments in error state
+    setupRealtimeTable(realtimeTableErrorState);
+
+    // we need to wait for SegmentStatusChecker to finish at least 1 run
+    Thread.sleep(TimeUnit.MILLISECONDS.convert(SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS + 10, TimeUnit.SECONDS));
+  }
+
+  private void setupOfflineTable(String table) throws Exception {
+    _realtimeTableConfig = null;
+    addOfflineTable(table);
+    completeTableConfiguration();
+  }
+
+  private void setupOfflineTableAndSegments(String table) throws Exception {
+    _realtimeTableConfig = null;
+    addOfflineTable(table);
+    completeTableConfiguration();
+    List<File> avroFiles = unpackAvroData(_tempDir);
+    ExecutorService executor = Executors.newCachedThreadPool();
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, table, false, null, null,
+        null, executor);
+    executor.shutdown();
+    executor.awaitTermination(10, TimeUnit.MINUTES);
+    uploadSegments(_tarDir);
+  }
+
+  private void setupRealtimeTable(String table) throws Exception {
+    _offlineTableConfig = null;
+    File schemaFile = getSchemaFile();
+    Schema schema = Schema.fromFile(schemaFile);
+    String schemaName = schema.getSchemaName();
+    addSchema(schemaFile, schemaName);
+
+    String timeColumnName = schema.getTimeColumnName();
+    Assert.assertNotNull(timeColumnName);
+    TimeUnit outgoingTimeUnit = schema.getOutgoingTimeUnit();
+    Assert.assertNotNull(outgoingTimeUnit);
+    String timeType = outgoingTimeUnit.toString();
+
+    addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
+        getKafkaTopic(), getRealtimeSegmentFlushSize(), null, timeColumnName, timeType, schemaName, null, null,
+        getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(),
+        getTaskConfig(), getStreamConsumerFactoryClassName());
+    completeTableConfiguration();
+  }
+
+  /**
+   * After 1 run of SegmentStatusChecker the controllerMetrics will be set for each table
+   * Validate that we are seeing the expected numbers
+   */
+  @Test
+  public void testSegmentStatusChecker() {
+    ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
+
+    // empty table - table1_OFFLINE
+    // num replicas set from ideal state
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable, ControllerGauge.NUMBER_OF_REPLICAS), 3);
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable, ControllerGauge.PERCENT_OF_REPLICAS), 100);
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
+        100);
+
+    // disabled table - table2_OFFLINE
+    // reset to defaults
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(disabledOfflineTable, ControllerGauge.NUMBER_OF_REPLICAS),
+        Long.MIN_VALUE);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(disabledOfflineTable, ControllerGauge.PERCENT_OF_REPLICAS),
+        Long.MIN_VALUE);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(disabledOfflineTable, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
+        Long.MIN_VALUE);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(disabledOfflineTable, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
+        Long.MIN_VALUE);
+
+    // happy path table - table3_OFFLINE
+    IdealState idealState = _helixResourceManager.getTableIdealState(basicOfflineTable);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.IDEALSTATE_ZNODE_SIZE),
+        idealState.toString().length());
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.SEGMENT_COUNT),
+        (long) (idealState.getPartitionSet().size()));
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.NUMBER_OF_REPLICAS),
+        3);
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.PERCENT_OF_REPLICAS),
+        100);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+
+    // offline segments - table4_OFFLINE
+    // 2 replicas available out of 3, percent 66
+    idealState = _helixResourceManager.getTableIdealState(errorOfflineTable);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.IDEALSTATE_ZNODE_SIZE),
+        idealState.toString().length());
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.SEGMENT_COUNT),
+        (long) (idealState.getPartitionSet().size()));
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.NUMBER_OF_REPLICAS),
+        2);
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.PERCENT_OF_REPLICAS),
+        66);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+
+    // error segments - table5_REALTIME
+    // no replicas available, all segments in error state
+    idealState = _helixResourceManager.getTableIdealState(realtimeTableErrorState);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.IDEALSTATE_ZNODE_SIZE),
+        idealState.toString().length());
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.SEGMENT_COUNT),
+        (long) (idealState.getPartitionSet().size()));
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.NUMBER_OF_REPLICAS), 0);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.PERCENT_OF_REPLICAS), 0);
+    Assert.assertTrue(
+        controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.SEGMENTS_IN_ERROR_STATE) > 0);
+    Assert.assertEquals(
+        controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 0);
+
+    // Total metrics
+    Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT), 4);
+    Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT), 1);
+    Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 1);
+  }
+
+  @Override
+  protected boolean isUsingNewConfigFormat() {
+    return true;
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception {
+    dropRealtimeTable(realtimeTableErrorState);
+    dropOfflineTable(emptyTable);
+    dropOfflineTable(disabledOfflineTable);
+    dropOfflineTable(basicOfflineTable);
+    dropOfflineTable(errorOfflineTable);
+
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org