You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/05/16 16:57:28 UTC

[incubator-pinot] branch master updated: Add integration test for BrokerResourceValidationManager (#4096)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cca15f  Add integration test for BrokerResourceValidationManager (#4096)
3cca15f is described below

commit 3cca15fe810b5e01fc76cc94de92749e73d656ba
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Thu May 16 09:57:22 2019 -0700

    Add integration test for BrokerResourceValidationManager (#4096)
---
 .../apache/pinot/controller/ControllerConf.java    | 12 +++
 .../BrokerResourceValidationManager.java           |  2 +-
 .../ControllerPeriodicTasksIntegrationTests.java   | 90 +++++++++++++++++++++-
 3 files changed, 101 insertions(+), 3 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 2372ac0..7fc638a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -78,6 +78,8 @@ public class ControllerConf extends PropertiesConfiguration {
         "controller.realtime.segment.validation.frequencyInSeconds";
     private static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS =
         "controller.broker.resource.validation.frequencyInSeconds";
+    private static final String BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS =
+        "controller.broker.resource.validation.initialDelayInSeconds";
     private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds";
     private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS =
         "controller.statuschecker.waitForPushTimeInSeconds";
@@ -453,6 +455,16 @@ public class ControllerConf extends PropertiesConfiguration {
         Integer.toString(validationFrequencyInSeconds));
   }
 
+  public long getBrokerResourceValidationInitialDelayInSeconds() {
+    return getLong(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS,
+        getPeriodicTaskInitialDelayInSeconds());
+  }
+
+  public void setBrokerResourceValidationInitialDelayInSeconds(long validationInitialDelayInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS,
+        validationInitialDelayInSeconds);
+  }
+
   public int getStatusCheckerFrequencyInSeconds() {
     if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
       return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS));
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 8f7e1c0..5748d3c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -39,7 +39,7 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask<Brok
   public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
       ControllerMetrics controllerMetrics) {
     super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
-        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
   }
 
   @Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index 9535cf6..77ca404 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -29,17 +29,23 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 import kafka.server.KafkaServerStartable;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.config.TagOverrideConfig;
 import org.apache.pinot.common.config.TenantConfig;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.KafkaStarterUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
@@ -68,7 +74,7 @@ import org.testng.annotations.Test;
  * See group = "segmentStatusChecker" for example.
  * The tables needed for the test will be created in beforeTask(), and dropped in afterTask()
  *
- * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation -> ....
+ * The groups run sequentially in the order: segmentStatusChecker -> realtimeSegmentRelocation -> brokerResourceValidationManager -> ....
  */
 public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrationTestSet {
 
@@ -101,6 +107,8 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
     controllerConf.setStatusCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
     controllerConf.setRealtimeSegmentRelocationInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
     controllerConf.setRealtimeSegmentRelocatorFrequency(PERIODIC_TASK_FREQ);
+    controllerConf.setBrokerResourceValidationInitialDelayInSeconds(PERIODIC_TASK_FREQ_SECONDS);
+    controllerConf.setBrokerResourceValidationFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
 
     startController(controllerConf);
     startBroker();
@@ -386,7 +394,85 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
     dropRealtimeTable(relocationTable);
   }
 
-  // TODO: tests for other ControllerPeriodicTasks (RetentionManager, BrokerValidationManager, OfflineSegmentIntervalChecker, RealtimeSegmentValidationManager)
+  @BeforeGroups(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
+  public void beforeBrokerResourceValidationManagerTest(ITestContext context)
+      throws Exception {
+    String table1 = "testTable";
+    String table2 = "testTable2";
+    context.setAttribute("testTableOne", table1);
+    context.setAttribute("testTableTwo", table2);
+    setupOfflineTable(table1);
+  }
+
+  @Test(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
+  public void testBrokerResourceValidationManager(ITestContext context)
+      throws Exception {
+    // Check that the first table we added doesn't need to be rebuilt(case where ideal state brokers and brokers in broker resource are the same.
+    String table1 = (String) context.getAttribute("testTableOne");
+    String table2 = (String) context.getAttribute("testTableTwo");
+    TableConfig tableConfigOne = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(table1).build();
+    String partitionNameOne = tableConfigOne.getTableName();
+
+    // Ensure that the broker resource is not rebuilt.
+    TestUtils.waitForCondition(input -> {
+      IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
+      return idealState.getInstanceSet(partitionNameOne)
+          .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
+    }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
+
+    // Add another table that needs to be rebuilt
+    TableConfig offlineTableConfigTwo =
+        new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(table2)
+            .setBrokerTenant(TENANT_NAME).setServerTenant(TENANT_NAME).build();
+    _helixResourceManager.addTable(offlineTableConfigTwo);
+    String partitionNameTwo = offlineTableConfigTwo.getTableName();
+
+    // Add a new broker manually such that the ideal state is not updated and ensure that rebuild broker resource is called
+    final String brokerId = "Broker_localhost_2";
+    InstanceConfig instanceConfig = new InstanceConfig(brokerId);
+    instanceConfig.setInstanceEnabled(true);
+    instanceConfig.setHostName("Broker_localhost");
+    instanceConfig.setPort("2");
+    _helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
+    _helixAdmin.addInstanceTag(getHelixClusterName(), instanceConfig.getInstanceName(),
+        TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
+
+    // Count the number of times we check on ideal state change, which is made by rebuild broker resource method.
+    AtomicInteger count = new AtomicInteger();
+    TestUtils.waitForCondition(input -> {
+      count.getAndIncrement();
+      IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
+      return idealState.getInstanceSet(partitionNameTwo)
+          .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
+    }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
+
+    // At least the broker resource won't be changed immediately.
+    Assert.assertTrue(count.get() > 1);
+
+    // Drop the instance so that broker resource doesn't match the current one.
+    _helixAdmin.dropInstance(getHelixClusterName(), instanceConfig);
+    count.set(0);
+    TestUtils.waitForCondition(input -> {
+      count.getAndIncrement();
+      IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName());
+      return idealState.getInstanceSet(partitionNameTwo)
+          .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME));
+    }, 60_000L, "Timeout when waiting for broker resource to be rebuilt");
+
+    // At least the broker resource won't be changed immediately.
+    Assert.assertTrue(count.get() > 1);
+  }
+
+  @AfterGroups(groups = "brokerResourceValidationManager", dependsOnGroups = "realtimeSegmentRelocator")
+  public void afterBrokerResourceValidationManagerTest(ITestContext context)
+      throws Exception {
+    String table1 = (String) context.getAttribute("testTableOne");
+    String table2 = (String) context.getAttribute("testTableTwo");
+    dropOfflineTable(table1);
+    dropOfflineTable(table2);
+  }
+
+  // TODO: tests for other ControllerPeriodicTasks (RetentionManager, OfflineSegmentIntervalChecker, RealtimeSegmentValidationManager)
 
   @Override
   protected boolean isUsingNewConfigFormat() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org