You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/19 22:43:23 UTC

(pinot) branch master updated: Support initializing broker tags from config (#12175)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f36cc10f4f Support initializing broker tags from config (#12175)
f36cc10f4f is described below

commit f36cc10f4fa70b00e5875d7b493a66524e6280ee
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Dec 19 14:43:17 2023 -0800

    Support initializing broker tags from config (#12175)
---
 .../broker/broker/helix/BaseBrokerStarter.java     | 25 ++++---
 .../tests/HybridClusterIntegrationTest.java        | 78 +++++++++++++++++++++-
 .../apache/pinot/spi/utils/CommonConstants.java    |  1 +
 3 files changed, 95 insertions(+), 9 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 4960809565..b95b820f25 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang.StringUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
@@ -439,17 +440,26 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
     }
     updated |= HelixHelper.removeDisabledPartitions(instanceConfig);
     boolean shouldUpdateBrokerResource = false;
-    String brokerTag = null;
     List<String> instanceTags = instanceConfig.getTags();
     if (instanceTags.isEmpty()) {
       // This is a new broker (first time joining the cluster)
       if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) {
-        brokerTag = TagNameUtils.getBrokerTagForTenant(null);
+        instanceConfig.addTag(TagNameUtils.getBrokerTagForTenant(null));
         shouldUpdateBrokerResource = true;
       } else {
-        brokerTag = Helix.UNTAGGED_BROKER_INSTANCE;
+        String instanceTagsConfig = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_INSTANCE_TAGS);
+        if (StringUtils.isNotEmpty(instanceTagsConfig)) {
+          for (String instanceTag : StringUtils.split(instanceTagsConfig, ',')) {
+            Preconditions.checkArgument(TagNameUtils.isBrokerTag(instanceTag), "Illegal broker instance tag: %s",
+                instanceTag);
+            instanceConfig.addTag(instanceTag);
+          }
+          shouldUpdateBrokerResource = true;
+        } else {
+          instanceConfig.addTag(Helix.UNTAGGED_BROKER_INSTANCE);
+        }
       }
-      instanceConfig.addTag(brokerTag);
+      instanceTags = instanceConfig.getTags();
       updated = true;
     }
     if (updated) {
@@ -459,10 +469,9 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
       // Update broker resource to include the new broker
       long startTimeMs = System.currentTimeMillis();
       List<String> tablesAdded = new ArrayList<>();
-      HelixHelper.updateBrokerResource(_participantHelixManager, _instanceId, Collections.singletonList(brokerTag),
-          tablesAdded, null);
-      LOGGER.info("Updated broker resource for new joining broker: {} in {}ms, tables added: {}", _instanceId,
-          System.currentTimeMillis() - startTimeMs, tablesAdded);
+      HelixHelper.updateBrokerResource(_participantHelixManager, _instanceId, instanceTags, tablesAdded, null);
+      LOGGER.info("Updated broker resource for new joining broker: {} with instance tags: {} in {}ms, tables added: {}",
+          _instanceId, instanceTags, System.currentTimeMillis() - startTimeMs, tablesAdded);
     }
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index 2b5c36bc59..73d0dcb9bd 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -21,9 +21,14 @@ package org.apache.pinot.integration.tests;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.net.URLEncoder;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
+import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -38,6 +43,10 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
+
 
 /**
  * Hybrid cluster integration test that uploads 8 months of data as offline and 6 months of data as realtime (with a
@@ -58,6 +67,11 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
     return TENANT_NAME;
   }
 
+  protected void overrideBrokerConf(PinotConfiguration configuration) {
+    configuration.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_INSTANCE_TAGS,
+        TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
+  }
+
   @Override
   protected void overrideServerConf(PinotConfiguration configuration) {
     configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, false);
@@ -116,10 +130,72 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
     startServers(2);
 
     // Create tenants
-    createBrokerTenant(TENANT_NAME, 1);
     createServerTenant(TENANT_NAME, 1, 1);
   }
 
+  @Test
+  public void testUpdateBrokerResource()
+      throws Exception {
+    // Add a new broker to the cluster
+    BaseBrokerStarter brokerStarter = startOneBroker(1);
+
+    // Check if broker is added to all the tables in broker resource
+    String clusterName = getHelixClusterName();
+    String brokerId = brokerStarter.getInstanceId();
+    IdealState brokerResourceIdealState =
+        _helixAdmin.getResourceIdealState(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    for (Map<String, String> brokerAssignment : brokerResourceIdealState.getRecord().getMapFields().values()) {
+      assertEquals(brokerAssignment.get(brokerId), CommonConstants.Helix.StateModel.BrokerResourceStateModel.ONLINE);
+    }
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView brokerResourceExternalView =
+          _helixAdmin.getResourceExternalView(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+      for (Map<String, String> brokerAssignment : brokerResourceExternalView.getRecord().getMapFields().values()) {
+        if (!brokerAssignment.containsKey(brokerId)) {
+          return false;
+        }
+      }
+      return true;
+    }, 60_000L, "Failed to find broker in broker resource ExternalView");
+
+    // Stop the broker
+    brokerStarter.stop();
+
+    // Dropping the broker should fail because it is still in the broker resource
+    try {
+      sendDeleteRequest(_controllerRequestURLBuilder.forInstance(brokerId));
+      fail("Dropping instance should fail because it is still in the broker resource");
+    } catch (Exception e) {
+      // Expected
+    }
+
+    // Untag the broker and update the broker resource so that it is removed from the broker resource
+    sendPutRequest(_controllerRequestURLBuilder.forInstanceUpdateTags(brokerId, Collections.emptyList(), true));
+
+    // Check if broker is removed from all the tables in broker resource
+    brokerResourceIdealState =
+        _helixAdmin.getResourceIdealState(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    for (Map<String, String> brokerAssignment : brokerResourceIdealState.getRecord().getMapFields().values()) {
+      assertFalse(brokerAssignment.containsKey(brokerId));
+    }
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView brokerResourceExternalView =
+          _helixAdmin.getResourceExternalView(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+      for (Map<String, String> brokerAssignment : brokerResourceExternalView.getRecord().getMapFields().values()) {
+        if (brokerAssignment.containsKey(brokerId)) {
+          return false;
+        }
+      }
+      return true;
+    }, 60_000L, "Failed to remove broker from broker resource ExternalView");
+
+    // Dropping the broker should success now
+    sendDeleteRequest(_controllerRequestURLBuilder.forInstance(brokerId));
+
+    // Check if broker is dropped from the cluster
+    assertFalse(_helixAdmin.getInstancesInCluster(clusterName).contains(brokerId));
+  }
+
   @Test
   public void testSegmentMetadataApi()
       throws Exception {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 84bcea58e2..574a81a996 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -234,6 +234,7 @@ public class CommonConstants {
     public static final String CONFIG_OF_BROKER_TIMEOUT_MS = "pinot.broker.timeoutMs";
     public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;
     public static final String CONFIG_OF_BROKER_ID = "pinot.broker.instance.id";
+    public static final String CONFIG_OF_BROKER_INSTANCE_TAGS = "pinot.broker.instance.tags";
     public static final String CONFIG_OF_BROKER_HOSTNAME = "pinot.broker.hostname";
     public static final String CONFIG_OF_SWAGGER_USE_HTTPS = "pinot.broker.swagger.use.https";
     // Comma separated list of packages that contains javax service resources.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org