You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/01/25 23:01:35 UTC
[incubator-pinot] branch master updated: Add integration test for
SegmentStatusChecker periodic task (#3741)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new eb7a918 Add integration test for SegmentStatusChecker periodic task (#3741)
eb7a918 is described below
commit eb7a9182b297e947d13cd76a3b0622f4ef576cf1
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Jan 25 15:01:30 2019 -0800
Add integration test for SegmentStatusChecker periodic task (#3741)
---
.../apache/pinot/controller/ControllerConf.java | 27 +++
.../apache/pinot/controller/ControllerStarter.java | 5 +
.../controller/helix/SegmentStatusChecker.java | 33 +--
.../helix/core/minion/PinotTaskManager.java | 3 +-
.../core/periodictask/ControllerPeriodicTask.java | 13 --
.../core/relocation/RealtimeSegmentRelocator.java | 2 +-
.../helix/core/retention/RetentionManager.java | 3 +-
.../BrokerResourceValidationManager.java | 2 +-
.../validation/OfflineSegmentIntervalChecker.java | 2 +-
.../RealtimeSegmentValidationManager.java | 2 +-
.../controller/helix/SegmentStatusCheckerTest.java | 45 ++--
.../periodictask/ControllerPeriodicTaskTest.java | 53 +++--
.../tasks/SegmentStatusCheckerIntegrationTest.java | 259 +++++++++++++++++++++
13 files changed, 372 insertions(+), 77 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 31f8abf..29f3f36 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -25,6 +25,7 @@ import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -56,6 +57,7 @@ public class ControllerConf extends PropertiesConfiguration {
private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT = "controller.upload.onlineToOfflineTimeout";
public static class ControllerPeriodicTasksConf {
+ // frequency configs
private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
@Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings
private static final String DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS =
@@ -77,6 +79,18 @@ public class ControllerConf extends PropertiesConfiguration {
private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
"controller.segment.level.validation.intervalInSeconds";
+ // Initial delays
+ private static final String STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS = "controller.statusChecker.initialDelayInSeconds";
+
+ public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
+ public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
+
+ private static final Random RANDOM = new Random();
+ private static long getRandomInitialDelayInSeconds() {
+ return MIN_INITIAL_DELAY_IN_SECONDS + RANDOM.nextInt(MAX_INITIAL_DELAY_IN_SECONDS - MIN_INITIAL_DELAY_IN_SECONDS);
+ }
+
+ // Default values
private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; // 24 Hours.
private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
@@ -548,4 +562,17 @@ public class ControllerConf extends PropertiesConfiguration {
return getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
}
+
+ public long getStatusCheckerInitialDelayInSeconds() {
+ return getLong(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS,
+ ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+ }
+
+ public void setStatusCheckerInitialDelayInSeconds(long initialDelayInSeconds) {
+ setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS, initialDelayInSeconds);
+ }
+
+ public long getPeriodicTaskInitialDelayInSeconds() {
+ return ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds();
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 5d1546e..ab0a7ad 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -352,6 +352,11 @@ public class ControllerStarter {
return _metricsRegistry;
}
+ @VisibleForTesting
+ public ControllerMetrics getControllerMetrics() {
+ return _controllerMetrics;
+ }
+
public static ControllerStarter startDefault() {
return startDefault(null);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 64d8aac..3cb1ee4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -64,7 +64,8 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
*/
public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
ControllerMetrics metricsRegistry) {
- super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(), pinotHelixResourceManager);
+ super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
+ config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager);
_waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
_metricsRegistry = metricsRegistry;
@@ -118,19 +119,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
} else {
_realTimeTableCount++;
}
+
IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
- if ((idealState == null) || (idealState.getPartitionSet().isEmpty())) {
- int nReplicasFromIdealState = 1;
- try {
- if (idealState != null) {
- nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
- }
- } catch (NumberFormatException e) {
- // Ignore
- }
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+
+ if (idealState == null) {
+ LOGGER.warn("Table {} has null ideal state. Skipping segment status checks", tableNameWithType);
+ resetTableMetrics(tableNameWithType);
return;
}
@@ -143,6 +137,19 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
return;
}
+ if (idealState.getPartitionSet().isEmpty()) {
+ int nReplicasFromIdealState = 1;
+ try {
+ nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
+ } catch (NumberFormatException e) {
+ // Ignore
+ }
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+ return;
+ }
+
_metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
idealState.toString().length());
_metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 0d15c80..2052f77 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -60,7 +60,8 @@ public class PinotTaskManager extends ControllerPeriodicTask {
public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager helixTaskResourceManager,
@Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull ControllerConf controllerConf,
@Nonnull ControllerMetrics controllerMetrics) {
- super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(), helixResourceManager);
+ super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
+ controllerConf.getPeriodicTaskInitialDelayInSeconds(), helixResourceManager);
_helixTaskResourceManager = helixTaskResourceManager;
_clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf);
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 9416c1c..40cd982 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -20,7 +20,6 @@ package org.apache.pinot.controller.helix.core.periodictask;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
-import java.util.Random;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.periodictask.BasePeriodicTask;
import org.slf4j.Logger;
@@ -33,10 +32,7 @@ import org.slf4j.LoggerFactory;
*/
public abstract class ControllerPeriodicTask extends BasePeriodicTask {
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class);
- private static final Random RANDOM = new Random();
- public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
- public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
@@ -51,15 +47,6 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
_pinotHelixResourceManager = pinotHelixResourceManager;
}
- public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
- PinotHelixResourceManager pinotHelixResourceManager) {
- this(taskName, runFrequencyInSeconds, getRandomInitialDelayInSeconds(), pinotHelixResourceManager);
- }
-
- private static long getRandomInitialDelayInSeconds() {
- return MIN_INITIAL_DELAY_IN_SECONDS + RANDOM.nextInt(MAX_INITIAL_DELAY_IN_SECONDS - MIN_INITIAL_DELAY_IN_SECONDS);
- }
-
/**
* Reset flags, and call initTask which initializes each individual task
*/
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index 6129669..a5a81de 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -56,7 +56,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
super("RealtimeSegmentRelocator", getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()),
- pinotHelixResourceManager);
+ config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
}
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index d9aa1d9..25a2db8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -54,7 +54,8 @@ public class RetentionManager extends ControllerPeriodicTask {
private final int _deletedSegmentsRetentionInDays;
public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
- super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(), pinotHelixResourceManager);
+ super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(),
+ config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
_deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 03bfd79..c969ab1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -39,7 +39,7 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask {
public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager) {
super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
- pinotHelixResourceManager);
+ config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
}
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 41bb950..14b9827 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -50,7 +50,7 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
ValidationMetrics validationMetrics) {
super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
- pinotHelixResourceManager);
+ config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
_validationMetrics = validationMetrics;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 94aac5c..245437f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -55,7 +55,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
- pinotHelixResourceManager);
+ config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
_validationMetrics = validationMetrics;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index db33605..c3cddaaf 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix;
+import com.google.common.collect.Lists;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.ArrayList;
import java.util.List;
@@ -266,7 +267,7 @@ public class SegmentStatusCheckerTest {
@Test
public void missingIdealTest() throws Exception {
final String tableName = "myTable_REALTIME";
- List<String> allTableNames = new ArrayList<String>();
+ List<String> allTableNames = new ArrayList<>();
allTableNames.add(tableName);
{
@@ -288,9 +289,9 @@ public class SegmentStatusCheckerTest {
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE), Long.MIN_VALUE);
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
- ControllerGauge.NUMBER_OF_REPLICAS), 1);
+ ControllerGauge.NUMBER_OF_REPLICAS), Long.MIN_VALUE);
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
- ControllerGauge.PERCENT_OF_REPLICAS), 100);
+ ControllerGauge.PERCENT_OF_REPLICAS), Long.MIN_VALUE);
}
@Test
@@ -391,11 +392,20 @@ public class SegmentStatusCheckerTest {
}
@Test
- public void noIdealState() throws Exception {
- final String tableName = "myTable_REALTIME";
+ public void disabledTableTest() throws Exception {
+
+ final String tableName = "myTable_OFFLINE";
List<String> allTableNames = new ArrayList<String>();
allTableNames.add(tableName);
- IdealState idealState = null;
+ IdealState idealState = new IdealState(tableName);
+ // disable table in idealstate
+ idealState.enable(false);
+ idealState.setPartitionState("myTable_OFFLINE", "pinot1", "OFFLINE");
+ idealState.setPartitionState("myTable_OFFLINE", "pinot2", "OFFLINE");
+ idealState.setPartitionState("myTable_OFFLINE", "pinot3", "OFFLINE");
+ idealState.setReplicas("1");
+ idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+
{
helixResourceManager = mock(PinotHelixResourceManager.class);
when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
@@ -410,30 +420,25 @@ public class SegmentStatusCheckerTest {
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ // verify state before test
+ Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
+ ControllerGauge.DISABLED_TABLE_COUNT), 0);
+ // update metrics
segmentStatusChecker.init();
segmentStatusChecker.run();
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
- ControllerGauge.SEGMENTS_IN_ERROR_STATE), Long.MIN_VALUE);
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
- ControllerGauge.NUMBER_OF_REPLICAS), 1);
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
- ControllerGauge.PERCENT_OF_REPLICAS), 100);
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
- ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+ Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
+ ControllerGauge.DISABLED_TABLE_COUNT), 1);
}
+
@Test
- public void disabledTableTest() throws Exception {
+ public void disabledEmptyTableTest() throws Exception {
final String tableName = "myTable_OFFLINE";
- List<String> allTableNames = new ArrayList<String>();
- allTableNames.add(tableName);
+ List<String> allTableNames = Lists.newArrayList(tableName);
IdealState idealState = new IdealState(tableName);
// disable table in idealstate
idealState.enable(false);
- idealState.setPartitionState("myTable_OFFLINE", "pinot1", "OFFLINE");
- idealState.setPartitionState("myTable_OFFLINE", "pinot2", "OFFLINE");
- idealState.setPartitionState("myTable_OFFLINE", "pinot3", "OFFLINE");
idealState.setReplicas("1");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index c759b9e..29a96de 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
+import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
@@ -36,6 +37,7 @@ import static org.testng.Assert.assertTrue;
public class ControllerPeriodicTaskTest {
private static final long RUN_FREQUENCY_IN_SECONDS = 30;
+ private final ControllerConf _controllerConf = new ControllerConf();
private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
@@ -44,31 +46,30 @@ public class ControllerPeriodicTaskTest {
private final AtomicInteger _numTablesProcessed = new AtomicInteger();
private final int _numTables = 3;
- private final MockControllerPeriodicTask _task =
- new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) {
+ private final MockControllerPeriodicTask _task = new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS,
+ _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager) {
- @Override
- protected void initTask() {
- _initTaskCalled.set(true);
- }
-
- @Override
- public void stopTask() {
- _stopTaskCalled.set(true);
- }
+ @Override
+ protected void initTask() {
+ _initTaskCalled.set(true);
+ }
- @Override
- public void process(List<String> tableNamesWithType) {
- _processCalled.set(true);
- super.process(tableNamesWithType);
- }
+ @Override
+ public void stopTask() {
+ _stopTaskCalled.set(true);
+ }
- @Override
- public void processTable(String tableNameWithType) {
- _numTablesProcessed.getAndIncrement();
- }
+ @Override
+ public void process(List<String> tableNamesWithType) {
+ _processCalled.set(true);
+ super.process(tableNamesWithType);
+ }
- };
+ @Override
+ public void processTable(String tableNameWithType) {
+ _numTablesProcessed.getAndIncrement();
+ }
+ };
@BeforeTest
public void beforeTest() {
@@ -86,8 +87,10 @@ public class ControllerPeriodicTaskTest {
@Test
public void testRandomInitialDelay() {
- assertTrue(_task.getInitialDelayInSeconds() >= ControllerPeriodicTask.MIN_INITIAL_DELAY_IN_SECONDS);
- assertTrue(_task.getInitialDelayInSeconds() < ControllerPeriodicTask.MAX_INITIAL_DELAY_IN_SECONDS);
+ assertTrue(
+ _task.getInitialDelayInSeconds() >= ControllerConf.ControllerPeriodicTasksConf.MIN_INITIAL_DELAY_IN_SECONDS);
+ assertTrue(
+ _task.getInitialDelayInSeconds() < ControllerConf.ControllerPeriodicTasksConf.MAX_INITIAL_DELAY_IN_SECONDS);
assertEquals(_task.getIntervalInSeconds(), RUN_FREQUENCY_IN_SECONDS);
}
@@ -134,9 +137,9 @@ public class ControllerPeriodicTaskTest {
private class MockControllerPeriodicTask extends ControllerPeriodicTask {
- public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
+ public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
PinotHelixResourceManager pinotHelixResourceManager) {
- super(taskName, runFrequencyInSeconds, pinotHelixResourceManager);
+ super(taskName, runFrequencyInSeconds, initialDelayInSeconds, pinotHelixResourceManager);
}
@Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
new file mode 100644
index 0000000..deefc4a
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.controller.periodic.tasks;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.KafkaStarterUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.common.utils.retry.RetryPolicies;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test to check {@link org.apache.pinot.controller.helix.SegmentStatusChecker} is
+ * running and verify the metrics emitted
+ */
+public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationTestSet {
+
+ private String emptyTable = "table1_OFFLINE";
+ private String disabledOfflineTable = "table2_OFFLINE";
+ private String basicOfflineTable = "table3_OFFLINE";
+ private String errorOfflineTable = "table4_OFFLINE";
+ private String realtimeTableErrorState = "table5_REALTIME";
+
+ private static final int SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS = 60;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ startZk();
+
+ // Set initial delay of 60 seconds for the segment status checker, to allow time for tables setup.
+ // By default, it will pick a random delay between 120s and 300s
+ ControllerConf controllerConf = getDefaultControllerConfiguration();
+ controllerConf.setStatusCheckerInitialDelayInSeconds(SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS);
+
+ startController(controllerConf);
+
+ startBroker();
+
+ startServers(3);
+
+ // empty table
+ setupOfflineTable(emptyTable);
+
+ // table with disabled ideal state
+ setupOfflineTable(disabledOfflineTable);
+ _helixAdmin.enableResource(_clusterName, disabledOfflineTable, false);
+
+ // happy case table
+ setupOfflineTableAndSegments(basicOfflineTable);
+
+ // some segments offline
+ setupOfflineTableAndSegments(errorOfflineTable);
+ HelixHelper.updateIdealState(_helixManager, errorOfflineTable, new Function<IdealState, IdealState>() {
+ @Nullable
+ @Override
+ public IdealState apply(@Nullable IdealState input) {
+ List<String> segmentNames = Lists.newArrayList(input.getPartitionSet());
+ Collections.sort(segmentNames);
+
+ Map<String, String> instanceStateMap1 = input.getInstanceStateMap(segmentNames.get(0));
+ for (String instance : instanceStateMap1.keySet()) {
+ instanceStateMap1.put(instance, "OFFLINE");
+ break;
+ }
+ return input;
+ }
+ }, RetryPolicies.fixedDelayRetryPolicy(2, 10));
+
+ // realtime table with segments in error state
+ setupRealtimeTable(realtimeTableErrorState);
+
+ // we need to wait for SegmentStatusChecker to finish at least 1 run
+ Thread.sleep(TimeUnit.MILLISECONDS.convert(SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS + 10, TimeUnit.SECONDS));
+ }
+
+ private void setupOfflineTable(String table) throws Exception {
+ _realtimeTableConfig = null;
+ addOfflineTable(table);
+ completeTableConfiguration();
+ }
+
+ private void setupOfflineTableAndSegments(String table) throws Exception {
+ _realtimeTableConfig = null;
+ addOfflineTable(table);
+ completeTableConfiguration();
+ List<File> avroFiles = unpackAvroData(_tempDir);
+ ExecutorService executor = Executors.newCachedThreadPool();
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, table, false, null, null,
+ null, executor);
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.MINUTES);
+ uploadSegments(_tarDir);
+ }
+
+ private void setupRealtimeTable(String table) throws Exception {
+ _offlineTableConfig = null;
+ File schemaFile = getSchemaFile();
+ Schema schema = Schema.fromFile(schemaFile);
+ String schemaName = schema.getSchemaName();
+ addSchema(schemaFile, schemaName);
+
+ String timeColumnName = schema.getTimeColumnName();
+ Assert.assertNotNull(timeColumnName);
+ TimeUnit outgoingTimeUnit = schema.getOutgoingTimeUnit();
+ Assert.assertNotNull(outgoingTimeUnit);
+ String timeType = outgoingTimeUnit.toString();
+
+ addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
+ getKafkaTopic(), getRealtimeSegmentFlushSize(), null, timeColumnName, timeType, schemaName, null, null,
+ getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(),
+ getTaskConfig(), getStreamConsumerFactoryClassName());
+ completeTableConfiguration();
+ }
+
+ /**
+ * After 1 run of SegmentStatusChecker the controllerMetrics will be set for each table
+ * Validate that we are seeing the expected numbers
+ */
+ @Test
+ public void testSegmentStatusChecker() {
+ ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
+
+ // empty table - table1_OFFLINE
+ // num replicas set from ideal state
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable, ControllerGauge.NUMBER_OF_REPLICAS), 3);
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable, ControllerGauge.PERCENT_OF_REPLICAS), 100);
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
+ 100);
+
+ // disabled table - table2_OFFLINE
+ // reset to defaults
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(disabledOfflineTable, ControllerGauge.NUMBER_OF_REPLICAS),
+ Long.MIN_VALUE);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(disabledOfflineTable, ControllerGauge.PERCENT_OF_REPLICAS),
+ Long.MIN_VALUE);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(disabledOfflineTable, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
+ Long.MIN_VALUE);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(disabledOfflineTable, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
+ Long.MIN_VALUE);
+
+ // happy path table - table3_OFFLINE
+ IdealState idealState = _helixResourceManager.getTableIdealState(basicOfflineTable);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.IDEALSTATE_ZNODE_SIZE),
+ idealState.toString().length());
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.SEGMENT_COUNT),
+ (long) (idealState.getPartitionSet().size()));
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.NUMBER_OF_REPLICAS),
+ 3);
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.PERCENT_OF_REPLICAS),
+ 100);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(basicOfflineTable, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+
+ // offline segments - table4_OFFLINE
+ // 2 replicas available out of 3, percent 66
+ idealState = _helixResourceManager.getTableIdealState(errorOfflineTable);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.IDEALSTATE_ZNODE_SIZE),
+ idealState.toString().length());
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.SEGMENT_COUNT),
+ (long) (idealState.getPartitionSet().size()));
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.NUMBER_OF_REPLICAS),
+ 2);
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.PERCENT_OF_REPLICAS),
+ 66);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(errorOfflineTable, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+
+ // error segments - table5_REALTIME
+ // no replicas available, all segments in error state
+ idealState = _helixResourceManager.getTableIdealState(realtimeTableErrorState);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.IDEALSTATE_ZNODE_SIZE),
+ idealState.toString().length());
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.SEGMENT_COUNT),
+ (long) (idealState.getPartitionSet().size()));
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.NUMBER_OF_REPLICAS), 0);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.PERCENT_OF_REPLICAS), 0);
+ Assert.assertTrue(
+ controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.SEGMENTS_IN_ERROR_STATE) > 0);
+ Assert.assertEquals(
+ controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 0);
+
+ // Total metrics
+ Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT), 4);
+ Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT), 1);
+ Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 1);
+ }
+
+ @Override
+ protected boolean isUsingNewConfigFormat() {
+ return true;
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ dropRealtimeTable(realtimeTableErrorState);
+ dropOfflineTable(emptyTable);
+ dropOfflineTable(disabledOfflineTable);
+ dropOfflineTable(basicOfflineTable);
+ dropOfflineTable(errorOfflineTable);
+
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org