You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/19 22:43:23 UTC
(pinot) branch master updated: Support initializing broker tags from config (#12175)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f36cc10f4f Support initializing broker tags from config (#12175)
f36cc10f4f is described below
commit f36cc10f4fa70b00e5875d7b493a66524e6280ee
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Dec 19 14:43:17 2023 -0800
Support initializing broker tags from config (#12175)
---
.../broker/broker/helix/BaseBrokerStarter.java | 25 ++++---
.../tests/HybridClusterIntegrationTest.java | 78 +++++++++++++++++++++-
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
3 files changed, 95 insertions(+), 9 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 4960809565..b95b820f25 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
@@ -439,17 +440,26 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
}
updated |= HelixHelper.removeDisabledPartitions(instanceConfig);
boolean shouldUpdateBrokerResource = false;
- String brokerTag = null;
List<String> instanceTags = instanceConfig.getTags();
if (instanceTags.isEmpty()) {
// This is a new broker (first time joining the cluster)
if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) {
- brokerTag = TagNameUtils.getBrokerTagForTenant(null);
+ instanceConfig.addTag(TagNameUtils.getBrokerTagForTenant(null));
shouldUpdateBrokerResource = true;
} else {
- brokerTag = Helix.UNTAGGED_BROKER_INSTANCE;
+ String instanceTagsConfig = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_INSTANCE_TAGS);
+ if (StringUtils.isNotEmpty(instanceTagsConfig)) {
+ for (String instanceTag : StringUtils.split(instanceTagsConfig, ',')) {
+ Preconditions.checkArgument(TagNameUtils.isBrokerTag(instanceTag), "Illegal broker instance tag: %s",
+ instanceTag);
+ instanceConfig.addTag(instanceTag);
+ }
+ shouldUpdateBrokerResource = true;
+ } else {
+ instanceConfig.addTag(Helix.UNTAGGED_BROKER_INSTANCE);
+ }
}
- instanceConfig.addTag(brokerTag);
+ instanceTags = instanceConfig.getTags();
updated = true;
}
if (updated) {
@@ -459,10 +469,9 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
// Update broker resource to include the new broker
long startTimeMs = System.currentTimeMillis();
List<String> tablesAdded = new ArrayList<>();
- HelixHelper.updateBrokerResource(_participantHelixManager, _instanceId, Collections.singletonList(brokerTag),
- tablesAdded, null);
- LOGGER.info("Updated broker resource for new joining broker: {} in {}ms, tables added: {}", _instanceId,
- System.currentTimeMillis() - startTimeMs, tablesAdded);
+ HelixHelper.updateBrokerResource(_participantHelixManager, _instanceId, instanceTags, tablesAdded, null);
+ LOGGER.info("Updated broker resource for new joining broker: {} with instance tags: {} in {}ms, tables added: {}",
+ _instanceId, instanceTags, System.currentTimeMillis() - startTimeMs, tablesAdded);
}
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index 2b5c36bc59..73d0dcb9bd 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -21,9 +21,14 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.net.URLEncoder;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
+import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -38,6 +43,10 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
+
/**
* Hybrid cluster integration test that uploads 8 months of data as offline and 6 months of data as realtime (with a
@@ -58,6 +67,11 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
return TENANT_NAME;
}
+ protected void overrideBrokerConf(PinotConfiguration configuration) {
+ configuration.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_INSTANCE_TAGS,
+ TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
+ }
+
@Override
protected void overrideServerConf(PinotConfiguration configuration) {
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, false);
@@ -116,10 +130,72 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
startServers(2);
// Create tenants
- createBrokerTenant(TENANT_NAME, 1);
createServerTenant(TENANT_NAME, 1, 1);
}
+ @Test
+ public void testUpdateBrokerResource()
+ throws Exception {
+ // Add a new broker to the cluster
+ BaseBrokerStarter brokerStarter = startOneBroker(1);
+
+ // Check if broker is added to all the tables in broker resource
+ String clusterName = getHelixClusterName();
+ String brokerId = brokerStarter.getInstanceId();
+ IdealState brokerResourceIdealState =
+ _helixAdmin.getResourceIdealState(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ for (Map<String, String> brokerAssignment : brokerResourceIdealState.getRecord().getMapFields().values()) {
+ assertEquals(brokerAssignment.get(brokerId), CommonConstants.Helix.StateModel.BrokerResourceStateModel.ONLINE);
+ }
+ TestUtils.waitForCondition(aVoid -> {
+ ExternalView brokerResourceExternalView =
+ _helixAdmin.getResourceExternalView(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ for (Map<String, String> brokerAssignment : brokerResourceExternalView.getRecord().getMapFields().values()) {
+ if (!brokerAssignment.containsKey(brokerId)) {
+ return false;
+ }
+ }
+ return true;
+ }, 60_000L, "Failed to find broker in broker resource ExternalView");
+
+ // Stop the broker
+ brokerStarter.stop();
+
+ // Dropping the broker should fail because it is still in the broker resource
+ try {
+ sendDeleteRequest(_controllerRequestURLBuilder.forInstance(brokerId));
+ fail("Dropping instance should fail because it is still in the broker resource");
+ } catch (Exception e) {
+ // Expected
+ }
+
+ // Untag the broker and update the broker resource so that it is removed from the broker resource
+ sendPutRequest(_controllerRequestURLBuilder.forInstanceUpdateTags(brokerId, Collections.emptyList(), true));
+
+ // Check if broker is removed from all the tables in broker resource
+ brokerResourceIdealState =
+ _helixAdmin.getResourceIdealState(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ for (Map<String, String> brokerAssignment : brokerResourceIdealState.getRecord().getMapFields().values()) {
+ assertFalse(brokerAssignment.containsKey(brokerId));
+ }
+ TestUtils.waitForCondition(aVoid -> {
+ ExternalView brokerResourceExternalView =
+ _helixAdmin.getResourceExternalView(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ for (Map<String, String> brokerAssignment : brokerResourceExternalView.getRecord().getMapFields().values()) {
+ if (brokerAssignment.containsKey(brokerId)) {
+ return false;
+ }
+ }
+ return true;
+ }, 60_000L, "Failed to remove broker from broker resource ExternalView");
+
+ // Dropping the broker should success now
+ sendDeleteRequest(_controllerRequestURLBuilder.forInstance(brokerId));
+
+ // Check if broker is dropped from the cluster
+ assertFalse(_helixAdmin.getInstancesInCluster(clusterName).contains(brokerId));
+ }
+
@Test
public void testSegmentMetadataApi()
throws Exception {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 84bcea58e2..574a81a996 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -234,6 +234,7 @@ public class CommonConstants {
public static final String CONFIG_OF_BROKER_TIMEOUT_MS = "pinot.broker.timeoutMs";
public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;
public static final String CONFIG_OF_BROKER_ID = "pinot.broker.instance.id";
+ public static final String CONFIG_OF_BROKER_INSTANCE_TAGS = "pinot.broker.instance.tags";
public static final String CONFIG_OF_BROKER_HOSTNAME = "pinot.broker.hostname";
public static final String CONFIG_OF_SWAGGER_USE_HTTPS = "pinot.broker.swagger.use.https";
// Comma separated list of packages that contains javax service resources.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org