You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/04/15 17:03:11 UTC

[incubator-pinot] branch master updated: Add validations to table config before updating a table config (#4103)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bb6a43  Add validations to table config before updating a table config (#4103)
2bb6a43 is described below

commit 2bb6a43114f1645057a6c6a1adcceab381e4559e
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Apr 15 10:03:05 2019 -0700

    Add validations to table config before updating a table config (#4103)
---
 .../api/resources/PinotTableRestletResource.java   |   7 +-
 .../helix/core/PinotHelixResourceManager.java      | 160 ++++++++++++--------
 .../helix/core/PinotHelixResourceManagerTest.java  | 166 ++++++++++++++++++++-
 .../tests/BaseClusterIntegrationTest.java          |  11 ++
 .../tests/RealtimeClusterIntegrationTest.java      |   7 +-
 5 files changed, 284 insertions(+), 67 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 52c0107..92da65a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -102,6 +102,11 @@ public class PinotTableRestletResource {
   @Inject
   ExecutorService _executorService;
 
+  /**
+   * API to create a table. Before adding, validations will be done (min number of replicas,
+   * checking offline and realtime table configs match, checking for tenants existing)
+   * @param tableConfigStr
+   */
   @POST
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/tables")
@@ -302,7 +307,7 @@ public class PinotTableRestletResource {
 
       ensureMinReplicas(tableConfig);
       verifyTableConfigs(tableConfig);
-      _pinotHelixResourceManager.setExistingTableConfig(tableConfig, tableNameWithType, tableType);
+      _pinotHelixResourceManager.updateTableConfig(tableConfig, tableNameWithType, tableType);
     } catch (PinotHelixResourceManager.InvalidTableConfigException e) {
       String errStr = String.format("Failed to update configuration for %s due to: %s", tableName, e.getMessage());
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR, 1L);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1ad19fc..a13fab3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix.core;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -335,6 +336,13 @@ public class PinotHelixResourceManager {
   }
 
   /**
+   * Get all instances with the given tag
+   */
+  public List<String> getInstancesWithTag(String tag) {
+    return HelixHelper.getInstancesWithTag(_helixZkManager, tag);
+  }
+
+  /**
    * Add an instance into the Helix cluster.
    *
    * @param instance Instance to be added
@@ -1033,69 +1041,22 @@ public class PinotHelixResourceManager {
   }
 
   /**
-   * Table APIs
-   * @throws InvalidTableConfigException
+   * Performs validations of table config and adds the table to zookeeper
+   * @throws InvalidTableConfigException if validations fail
    * @throws TableAlreadyExistsException for offline tables only if the table already exists
    */
   public void addTable(@Nonnull TableConfig tableConfig)
       throws IOException {
     final String tableNameWithType = tableConfig.getTableName();
+    TableType tableType = tableConfig.getTableType();
 
-    TenantConfig tenantConfig;
     if (isSingleTenantCluster()) {
-      tenantConfig = new TenantConfig();
+      TenantConfig tenantConfig = new TenantConfig();
       tenantConfig.setBroker(TagNameUtils.DEFAULT_TENANT_NAME);
       tenantConfig.setServer(TagNameUtils.DEFAULT_TENANT_NAME);
       tableConfig.setTenantConfig(tenantConfig);
-    } else {
-      tenantConfig = tableConfig.getTenantConfig();
-      if (tenantConfig.getBroker() == null || tenantConfig.getServer() == null) {
-        throw new InvalidTableConfigException("Tenant is not configured for table: " + tableNameWithType);
-      }
-    }
-
-    // Check if tenant exists before creating the table
-    TableType tableType = tableConfig.getTableType();
-    String brokerTenantName = TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker());
-    List<String> brokersForTenant = HelixHelper.getInstancesWithTag(_helixZkManager, brokerTenantName);
-    if (brokersForTenant.isEmpty()) {
-      throw new InvalidTableConfigException(
-          "Broker tenant: " + brokerTenantName + " does not exist for table: " + tableNameWithType);
-    }
-    String serverTenantName =
-        TagNameUtils.getTagFromTenantAndServerType(tenantConfig.getServer(), tableType.getServerType());
-    if (HelixHelper.getInstancesWithTag(_helixZkManager, serverTenantName).isEmpty()) {
-      throw new InvalidTableConfigException(
-          "Server tenant: " + serverTenantName + " does not exist for table: " + tableNameWithType);
-    }
-    TagOverrideConfig tagOverrideConfig = tenantConfig.getTagOverrideConfig();
-    if (tagOverrideConfig != null) {
-      String realtimeConsumingTag = tagOverrideConfig.getRealtimeConsuming();
-      if (realtimeConsumingTag != null) {
-        if (!TagNameUtils.hasValidServerTagSuffix(realtimeConsumingTag)) {
-          throw new InvalidTableConfigException(
-              "Invalid realtime consuming tag: " + realtimeConsumingTag + ". Must have suffix _REALTIME or _OFFLINE");
-        }
-        if (HelixHelper.getInstancesWithTag(_helixZkManager, realtimeConsumingTag).isEmpty()) {
-          throw new InvalidTableConfigException(
-              "No instances found with overridden realtime consuming tag: " + realtimeConsumingTag + " for table: "
-                  + tableNameWithType);
-        }
-      }
-
-      String realtimeCompletedTag = tagOverrideConfig.getRealtimeCompleted();
-      if (realtimeCompletedTag != null) {
-        if (!TagNameUtils.hasValidServerTagSuffix(realtimeCompletedTag)) {
-          throw new InvalidTableConfigException(
-              "Invalid realtime completed tag: " + realtimeCompletedTag + ". Must have suffix _REALTIME or _OFFLINE");
-        }
-        if (HelixHelper.getInstancesWithTag(_helixZkManager, realtimeCompletedTag).isEmpty()) {
-          throw new InvalidTableConfigException(
-              "No instances found with overridden realtime completed tag: " + realtimeCompletedTag + " for table: "
-                  + tableNameWithType);
-        }
-      }
     }
+    validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
 
     SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig();
     switch (tableType) {
@@ -1160,7 +1121,67 @@ public class PinotHelixResourceManager {
         throw new InvalidTableConfigException("UnSupported table type: " + tableType);
     }
 
-    handleBrokerResource(tableNameWithType, brokersForTenant);
+    String brokerTenantName = TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker());
+    handleBrokerResource(tableNameWithType, HelixHelper.getInstancesWithTag(_helixZkManager, brokerTenantName));
+  }
+
+  /**
+   * Validates the tenant config for the table
+   */
+  @VisibleForTesting
+  protected void validateTableTenantConfig(TableConfig tableConfig, String tableNameWithType, TableType tableType) {
+    if (tableConfig == null) {
+      throw new InvalidTableConfigException(
+          "Table config is null for table: " + tableNameWithType);
+    }
+    TenantConfig tenantConfig = tableConfig.getTenantConfig();
+    if (tenantConfig == null || tenantConfig.getBroker() == null || tenantConfig.getServer() == null) {
+      throw new InvalidTableConfigException(
+          "Tenant is not configured for table: " + tableNameWithType);
+    }
+    // Check if tenant exists before creating the table
+    String brokerTenantName = TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker());
+    List<String> brokersForTenant = getInstancesWithTag(brokerTenantName);
+    if (brokersForTenant.isEmpty()) {
+      throw new InvalidTableConfigException(
+          "Broker tenant: " + brokerTenantName + " does not exist for table: " + tableNameWithType);
+    }
+    String serverTenantName =
+        TagNameUtils.getTagFromTenantAndServerType(tenantConfig.getServer(), tableType.getServerType());
+    if (getInstancesWithTag(serverTenantName).isEmpty()) {
+      throw new InvalidTableConfigException(
+          "Server tenant: " + serverTenantName + " does not exist for table: " + tableNameWithType);
+    }
+    TagOverrideConfig tagOverrideConfig = tenantConfig.getTagOverrideConfig();
+    if (tagOverrideConfig != null) {
+      String realtimeConsumingTag = tagOverrideConfig.getRealtimeConsuming();
+      if (realtimeConsumingTag != null) {
+        if (!TagNameUtils.hasValidServerTagSuffix(realtimeConsumingTag)) {
+          throw new InvalidTableConfigException(
+              "Invalid realtime consuming tag: " + realtimeConsumingTag + " for table " + tableNameWithType
+                  + ". Must have suffix _REALTIME or _OFFLINE");
+        }
+        if (getInstancesWithTag(realtimeConsumingTag).isEmpty()) {
+          throw new InvalidTableConfigException(
+              "No instances found with overridden realtime consuming tag: " + realtimeConsumingTag + " for table: "
+                  + tableNameWithType);
+        }
+      }
+
+      String realtimeCompletedTag = tagOverrideConfig.getRealtimeCompleted();
+      if (realtimeCompletedTag != null) {
+        if (!TagNameUtils.hasValidServerTagSuffix(realtimeCompletedTag)) {
+          throw new InvalidTableConfigException(
+              "Invalid realtime completed tag: " + realtimeCompletedTag + " for table " + tableNameWithType
+                  + ". Must have suffix _REALTIME or _OFFLINE");
+        }
+        if (getInstancesWithTag(realtimeCompletedTag).isEmpty()) {
+          throw new InvalidTableConfigException(
+              "No instances found with overridden realtime completed tag: " + realtimeCompletedTag + " for table: "
+                  + tableNameWithType);
+        }
+      }
+    }
   }
 
   /**
@@ -1272,19 +1293,34 @@ public class PinotHelixResourceManager {
     }
   }
 
-  public void setExistingTableConfig(TableConfig config, String tableNameWithType, TableType type)
+  /**
+   * Validate the table config and update it
+   * @throws IOException
+   */
+  public void updateTableConfig(TableConfig tableConfig, String tableNameWithType, TableType tableType)
       throws IOException {
-    if (type == TableType.REALTIME) {
-      ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, config.toZNRecord());
-      ensureRealtimeClusterIsSetUp(config, tableNameWithType, config.getIndexingConfig());
-    } else if (type == TableType.OFFLINE) {
+
+    validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+    setExistingTableConfig(tableConfig, tableNameWithType, tableType);
+  }
+
+  /**
+   * Sets the given table config into zookeeper
+   */
+  public void setExistingTableConfig(TableConfig tableConfig, String tableNameWithType, TableType tableType)
+      throws IOException {
+
+    if (tableType == TableType.REALTIME) {
+      ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord());
+      ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType, tableConfig.getIndexingConfig());
+    } else if (tableType == TableType.OFFLINE) {
       // Update replica group partition assignment to the property store if applicable
-      updateReplicaGroupPartitionAssignment(config);
+      updateReplicaGroupPartitionAssignment(tableConfig);
 
-      ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, config.toZNRecord());
+      ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord());
       IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
-      final String configReplication = config.getValidationConfig().getReplication();
-      if (configReplication != null && !config.getValidationConfig().getReplication()
+      final String configReplication = tableConfig.getValidationConfig().getReplication();
+      if (configReplication != null && !tableConfig.getValidationConfig().getReplication()
           .equals(idealState.getReplicas())) {
         HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, new Function<IdealState, IdealState>() {
           @Nullable
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 11459fd..573ab90 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -31,12 +31,15 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.TagNameUtils;
+import org.apache.pinot.common.config.TagOverrideConfig;
 import org.apache.pinot.common.config.Tenant;
+import org.apache.pinot.common.config.TenantConfig;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.TenantRole;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.ControllerConf;
@@ -48,6 +51,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.*;
+
 
 public class PinotHelixResourceManagerTest extends ControllerTest {
   private static final int BASE_SERVER_ADMIN_PORT = 10000;
@@ -188,7 +193,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
 
     // Create the table
     TableConfig tableConfig =
-        new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(3)
+        new TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(3)
             .setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build();
     _helixResourceManager.addTable(tableConfig);
 
@@ -294,6 +299,165 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
     _helixAdmin.removeInstanceTag(_helixClusterName, testServerInstance, "wrong_tag");
   }
 
+  @Test
+  public void testValidateTenantConfigs() {
+    String tableNameWithType = "testTable_OFFLINE";
+    TableType tableType = TableType.OFFLINE;
+    int numReplica = 2;
+    TableConfig tableConfig = null;
+    String brokerTag = "aBrokerTag";
+    String serverTag = "aServerTag";
+
+    // null table config
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+      Assert.fail("Expected InvalidTableConfigException");
+    } catch (InvalidTableConfigException e1) {
+      // expected
+    } catch (Exception e2) {
+      Assert.fail("Expected InvalidTableConfigException");
+    }
+
+    // null tenant config
+    tableConfig = new TableConfig.Builder(tableType).setTableName(tableNameWithType).setNumReplicas(numReplica).build();
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+      Assert.fail("Expected InvalidTableConfigException");
+    } catch (InvalidTableConfigException e1) {
+      // expected
+    } catch (Exception e2) {
+      Assert.fail("Expected InvalidTableConfigException");
+    }
+
+    // null broker tenant
+    TenantConfig tenantConfig = new TenantConfig();
+    tenantConfig.setServer(serverTag);
+    tableConfig.setTenantConfig(tenantConfig);
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+      Assert.fail("Expected InvalidTableConfigException");
+    } catch (InvalidTableConfigException e1) {
+      // expected
+    } catch (Exception e2) {
+      Assert.fail("Expected InvalidTableConfigException");
+    }
+
+    // null server tenant
+    tenantConfig.setServer(null);
+    tenantConfig.setBroker(brokerTag);
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+      Assert.fail("Expected InvalidTableConfigException");
+    } catch (InvalidTableConfigException e1) {
+      // expected
+    } catch (Exception e2) {
+      Assert.fail("Expected InvalidTableConfigException");
+    }
+
+    // empty broker instances list
+    tenantConfig.setServer(serverTag);
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+      Assert.fail("Expected InvalidTableConfigException");
+    } catch (InvalidTableConfigException e1) {
+      // expected
+    } catch (Exception e2) {
+      Assert.fail("Expected InvalidTableConfigException");
+    }
+
+    // Create broker tenant on 3 Brokers
+    Tenant brokerTenant =
+        new Tenant.TenantBuilder(brokerTag).setRole(TenantRole.BROKER).setTotalInstances(3).build();
+    _helixResourceManager.createBrokerTenant(brokerTenant);
+
+    // empty server instances list
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+      Assert.fail("Expected InvalidTableConfigException");
+    } catch (InvalidTableConfigException e1) {
+      // expected
+    } catch (Exception e2) {
+      Assert.fail("Expected InvalidTableConfigException");
+    }
+
+    // valid tenant config, null tagOverrideConfig
+    serverTag = SERVER_TENANT_NAME;
+    tenantConfig.setServer(serverTag);
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+    } catch (Exception e2) {
+      Assert.fail("No exceptions expected");
+    }
+
+    // valid tagOverride config
+    TagOverrideConfig tagOverrideConfig = new TagOverrideConfig();
+    tenantConfig.setTagOverrideConfig(tagOverrideConfig);
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+    } catch (Exception e2) {
+      Assert.fail("No exceptions expected");
+    }
+
+    // incorrect realtime consuming tag suffix
+    tagOverrideConfig.setRealtimeConsuming("incorrectTag_XXX");
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+      Assert.fail("Expected InvalidTableConfigException");
+    } catch (InvalidTableConfigException e1) {
+      // expected
+    } catch (Exception e2) {
+      Assert.fail("Expected InvalidTableConfigException");
+    }
+
+    // incorrect realtime consuming tag suffix
+    tagOverrideConfig.setRealtimeConsuming("correctTagEmptyList_OFFLINE");
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+      Assert.fail("Expected InvalidTableConfigException");
+    } catch (InvalidTableConfigException e1) {
+      // expected
+    } catch (Exception e2) {
+      Assert.fail("Expected InvalidTableConfigException");
+    }
+
+    // incorrect realtime completed tag suffix
+    tagOverrideConfig.setRealtimeConsuming(serverTag + "_OFFLINE");
+    tagOverrideConfig.setRealtimeCompleted("incorrectTag_XXX");
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+      Assert.fail("Expected InvalidTableConfigException");
+    } catch (InvalidTableConfigException e1) {
+      // expected
+    } catch (Exception e2) {
+      Assert.fail("Expected InvalidTableConfigException");
+    }
+
+    // empty list in realtime completed
+    tagOverrideConfig.setRealtimeCompleted("correctTagEmptyList_OFFLINE");
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+      Assert.fail("Expected InvalidTableConfigException");
+    } catch (InvalidTableConfigException e1) {
+      // expected
+    } catch (Exception e2) {
+      Assert.fail("Expected InvalidTableConfigException");
+    }
+
+    // all good
+    tagOverrideConfig.setRealtimeCompleted(serverTag + "_OFFLINE");
+    try {
+      _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+    } catch (Exception e2) {
+      Assert.fail("No exceptions expected");
+    }
+
+    for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(brokerTag)) {
+      _helixAdmin
+          .removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(brokerTag));
+      _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+    }
+  }
+
   @AfterMethod
   public void cleanUpBrokerTags() {
     // Untag all Brokers for other tests
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index a865aa7..36e67ab 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.ConnectionFactory;
 import org.apache.pinot.common.config.TableTaskConfig;
+import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.utils.KafkaStarterUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.ZkStarter;
@@ -175,6 +176,16 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
     return null;
   }
 
+  @Nullable
+  protected  String getServerTenant() {
+    return TagNameUtils.DEFAULT_TENANT_NAME;
+  }
+
+  @Nullable
+  protected String getBrokerTenant() {
+    return TagNameUtils.DEFAULT_TENANT_NAME;
+  }
+
   /**
    * Get the Pinot connection.
    *
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 46315f2..419a3cf 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -100,9 +100,10 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
     String timeType = outgoingTimeUnit.toString();
 
     addRealtimeTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
-        getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, null, null,
-        getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(),
-        getTaskConfig(), getStreamConsumerFactoryClassName());
+        getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName,
+        getBrokerTenant(), getServerTenant(), getLoadMode(), getSortedColumn(),
+        getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(),
+        getStreamConsumerFactoryClassName());
 
     completeTableConfiguration();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org