You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/05/16 16:57:28 UTC
[incubator-pinot] branch master updated: Add integration test for
BrokerResourceValidationManager (#4096)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3cca15f Add integration test for BrokerResourceValidationManager (#4096)
3cca15f is described below
commit 3cca15fe810b5e01fc76cc94de92749e73d656ba
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Thu May 16 09:57:22 2019 -0700
Add integration test for BrokerResourceValidationManager (#4096)
---
.../apache/pinot/controller/ControllerConf.java | 12 +++
.../BrokerResourceValidationManager.java | 2 +-
.../ControllerPeriodicTasksIntegrationTests.java | 90 +++++++++++++++++++++-
3 files changed, 101 insertions(+), 3 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 2372ac0..7fc638a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -78,6 +78,8 @@ public class ControllerConf extends PropertiesConfiguration {
"controller.realtime.segment.validation.frequencyInSeconds";
private static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS =
"controller.broker.resource.validation.frequencyInSeconds";
+ private static final String BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS =
+ "controller.broker.resource.validation.initialDelayInSeconds";
private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds";
private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS =
"controller.statuschecker.waitForPushTimeInSeconds";
@@ -453,6 +455,16 @@ public class ControllerConf extends PropertiesConfiguration {
Integer.toString(validationFrequencyInSeconds));
}
+ public long getBrokerResourceValidationInitialDelayInSeconds() {
+ return getLong(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS,
+ getPeriodicTaskInitialDelayInSeconds());
+ }
+
+ public void setBrokerResourceValidationInitialDelayInSeconds(long validationInitialDelayInSeconds) {
+ setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS,
+ validationInitialDelayInSeconds);
+ }
+
public int getStatusCheckerFrequencyInSeconds() {
if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS));
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 8f7e1c0..5748d3c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -39,7 +39,7 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask<Brok
public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
ControllerMetrics controllerMetrics) {
super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
- config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+ config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
}
@Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index 9535cf6..77ca404 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -29,17 +29,23 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.config.TagOverrideConfig;
import org.apache.pinot.common.config.TenantConfig;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.KafkaStarterUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.retry.RetryPolicies;
@@ -68,7 +74,7 @@ import org.testng.annotations.Test;
* See group = "segmentStatusChecker" for example.
* The tables needed for the test will be created in beforeTask(), and dropped in afterTask()
*
- * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation -> ....
+ * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation -> brokerResourceValidationManager -> ....
*/
public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrationTestSet {
@@ -101,6 +107,8 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
controllerConf.setStatusCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
controllerConf.setRealtimeSegmentRelocationInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
controllerConf.setRealtimeSegmentRelocatorFrequency(PERIODIC_TASK_FREQ);
+ controllerConf.setBrokerResourceValidationInitialDelayInSeconds(PERIODIC_TASK_FREQ_SECONDS);
+ controllerConf.setBrokerResourceValidationFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
startController(controllerConf);
startBroker();
@@ -386,7 +394,85 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
dropRealtimeTable(relocationTable);
}
- // TODO: tests for other ControllerPeriodicTasks (RetentionManager, BrokerValidationManager, OfflineSegmentIntervalChecker, RealtimeSegmentValidationManager)
+ @BeforeGroups(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
+ public void beforeBrokerResourceValidationManagerTest(ITestContext context)
+ throws Exception {
+ String table1 = "testTable";
+ String table2 = "testTable2";
+ context.setAttribute("testTableOne", table1);
+ context.setAttribute("testTableTwo", table2);
+ setupOfflineTable(table1);
+ }
+
+ @Test(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
+ public void testBrokerResourceValidationManager(ITestContext context)
+ throws Exception {
+ // Check that the first table we added doesn't need to be rebuilt(case where ideal state brokers and brokers in broker resource are the same.
+ String table1 = (String) context.getAttribute("testTableOne");
+ String table2 = (String) context.getAttribute("testTableTwo");
+ TableConfig tableConfigOne = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(table1).build();
+ String partitionNameOne = tableConfigOne.getTableName();
+
+ // Ensure that the broker resource is not rebuilt.
+ TestUtils.waitForCondition(input -> {
+ IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
+ return idealState.getInstanceSet(partitionNameOne)
+ .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
+ }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
+
+ // Add another table that needs to be rebuilt
+ TableConfig offlineTableConfigTwo =
+ new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(table2)
+ .setBrokerTenant(TENANT_NAME).setServerTenant(TENANT_NAME).build();
+ _helixResourceManager.addTable(offlineTableConfigTwo);
+ String partitionNameTwo = offlineTableConfigTwo.getTableName();
+
+ // Add a new broker manually such that the ideal state is not updated and ensure that rebuild broker resource is called
+ final String brokerId = "Broker_localhost_2";
+ InstanceConfig instanceConfig = new InstanceConfig(brokerId);
+ instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setHostName("Broker_localhost");
+ instanceConfig.setPort("2");
+ _helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
+ _helixAdmin.addInstanceTag(getHelixClusterName(), instanceConfig.getInstanceName(),
+ TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
+
+ // Count the number of times we check on ideal state change, which is made by rebuild broker resource method.
+ AtomicInteger count = new AtomicInteger();
+ TestUtils.waitForCondition(input -> {
+ count.getAndIncrement();
+ IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
+ return idealState.getInstanceSet(partitionNameTwo)
+ .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
+ }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
+
+ // At least the broker resource won't be changed immediately.
+ Assert.assertTrue(count.get() > 1);
+
+ // Drop the instance so that broker resource doesn't match the current one.
+ _helixAdmin.dropInstance(getHelixClusterName(), instanceConfig);
+ count.set(0);
+ TestUtils.waitForCondition(input -> {
+ count.getAndIncrement();
+ IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
+ return idealState.getInstanceSet(partitionNameTwo)
+ .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
+ }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
+
+ // At least the broker resource won't be changed immediately.
+ Assert.assertTrue(count.get() > 1);
+ }
+
+ @AfterGroups(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
+ public void afterBrokerResourceValidationManagerTest(ITestContext context)
+ throws Exception {
+ String table1 = (String) context.getAttribute("testTableOne");
+ String table2 = (String) context.getAttribute("testTableTwo");
+ dropOfflineTable(table1);
+ dropOfflineTable(table2);
+ }
+
+ // TODO: tests for other ControllerPeriodicTasks (RetentionManager, OfflineSegmentIntervalChecker, RealtimeSegmentValidationManager)
@Override
protected boolean isUsingNewConfigFormat() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org