You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/04/15 17:03:11 UTC
[incubator-pinot] branch master updated: Add validations to table
config before updating a table config (#4103)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2bb6a43 Add validations to table config before updating a table config (#4103)
2bb6a43 is described below
commit 2bb6a43114f1645057a6c6a1adcceab381e4559e
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Apr 15 10:03:05 2019 -0700
Add validations to table config before updating a table config (#4103)
---
.../api/resources/PinotTableRestletResource.java | 7 +-
.../helix/core/PinotHelixResourceManager.java | 160 ++++++++++++--------
.../helix/core/PinotHelixResourceManagerTest.java | 166 ++++++++++++++++++++-
.../tests/BaseClusterIntegrationTest.java | 11 ++
.../tests/RealtimeClusterIntegrationTest.java | 7 +-
5 files changed, 284 insertions(+), 67 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 52c0107..92da65a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -102,6 +102,11 @@ public class PinotTableRestletResource {
@Inject
ExecutorService _executorService;
+ /**
+ * API to create a table. Before adding, validations will be done (min number of replicas,
+ * checking offline and realtime table configs match, checking for tenants existing)
+ * @param tableConfigStr
+ */
@POST
@Produces(MediaType.APPLICATION_JSON)
@Path("/tables")
@@ -302,7 +307,7 @@ public class PinotTableRestletResource {
ensureMinReplicas(tableConfig);
verifyTableConfigs(tableConfig);
- _pinotHelixResourceManager.setExistingTableConfig(tableConfig, tableNameWithType, tableType);
+ _pinotHelixResourceManager.updateTableConfig(tableConfig, tableNameWithType, tableType);
} catch (PinotHelixResourceManager.InvalidTableConfigException e) {
String errStr = String.format("Failed to update configuration for %s due to: %s", tableName, e.getMessage());
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR, 1L);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1ad19fc..a13fab3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix.core;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -335,6 +336,13 @@ public class PinotHelixResourceManager {
}
/**
+ * Get all instances with the given tag
+ */
+ public List<String> getInstancesWithTag(String tag) {
+ return HelixHelper.getInstancesWithTag(_helixZkManager, tag);
+ }
+
+ /**
* Add an instance into the Helix cluster.
*
* @param instance Instance to be added
@@ -1033,69 +1041,22 @@ public class PinotHelixResourceManager {
}
/**
- * Table APIs
- * @throws InvalidTableConfigException
+ * Performs validations of table config and adds the table to zookeeper
+ * @throws InvalidTableConfigException if validations fail
* @throws TableAlreadyExistsException for offline tables only if the table already exists
*/
public void addTable(@Nonnull TableConfig tableConfig)
throws IOException {
final String tableNameWithType = tableConfig.getTableName();
+ TableType tableType = tableConfig.getTableType();
- TenantConfig tenantConfig;
if (isSingleTenantCluster()) {
- tenantConfig = new TenantConfig();
+ TenantConfig tenantConfig = new TenantConfig();
tenantConfig.setBroker(TagNameUtils.DEFAULT_TENANT_NAME);
tenantConfig.setServer(TagNameUtils.DEFAULT_TENANT_NAME);
tableConfig.setTenantConfig(tenantConfig);
- } else {
- tenantConfig = tableConfig.getTenantConfig();
- if (tenantConfig.getBroker() == null || tenantConfig.getServer() == null) {
- throw new InvalidTableConfigException("Tenant is not configured for table: " + tableNameWithType);
- }
- }
-
- // Check if tenant exists before creating the table
- TableType tableType = tableConfig.getTableType();
- String brokerTenantName = TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker());
- List<String> brokersForTenant = HelixHelper.getInstancesWithTag(_helixZkManager, brokerTenantName);
- if (brokersForTenant.isEmpty()) {
- throw new InvalidTableConfigException(
- "Broker tenant: " + brokerTenantName + " does not exist for table: " + tableNameWithType);
- }
- String serverTenantName =
- TagNameUtils.getTagFromTenantAndServerType(tenantConfig.getServer(), tableType.getServerType());
- if (HelixHelper.getInstancesWithTag(_helixZkManager, serverTenantName).isEmpty()) {
- throw new InvalidTableConfigException(
- "Server tenant: " + serverTenantName + " does not exist for table: " + tableNameWithType);
- }
- TagOverrideConfig tagOverrideConfig = tenantConfig.getTagOverrideConfig();
- if (tagOverrideConfig != null) {
- String realtimeConsumingTag = tagOverrideConfig.getRealtimeConsuming();
- if (realtimeConsumingTag != null) {
- if (!TagNameUtils.hasValidServerTagSuffix(realtimeConsumingTag)) {
- throw new InvalidTableConfigException(
- "Invalid realtime consuming tag: " + realtimeConsumingTag + ". Must have suffix _REALTIME or _OFFLINE");
- }
- if (HelixHelper.getInstancesWithTag(_helixZkManager, realtimeConsumingTag).isEmpty()) {
- throw new InvalidTableConfigException(
- "No instances found with overridden realtime consuming tag: " + realtimeConsumingTag + " for table: "
- + tableNameWithType);
- }
- }
-
- String realtimeCompletedTag = tagOverrideConfig.getRealtimeCompleted();
- if (realtimeCompletedTag != null) {
- if (!TagNameUtils.hasValidServerTagSuffix(realtimeCompletedTag)) {
- throw new InvalidTableConfigException(
- "Invalid realtime completed tag: " + realtimeCompletedTag + ". Must have suffix _REALTIME or _OFFLINE");
- }
- if (HelixHelper.getInstancesWithTag(_helixZkManager, realtimeCompletedTag).isEmpty()) {
- throw new InvalidTableConfigException(
- "No instances found with overridden realtime completed tag: " + realtimeCompletedTag + " for table: "
- + tableNameWithType);
- }
- }
}
+ validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig();
switch (tableType) {
@@ -1160,7 +1121,67 @@ public class PinotHelixResourceManager {
throw new InvalidTableConfigException("UnSupported table type: " + tableType);
}
- handleBrokerResource(tableNameWithType, brokersForTenant);
+ String brokerTenantName = TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker());
+ handleBrokerResource(tableNameWithType, HelixHelper.getInstancesWithTag(_helixZkManager, brokerTenantName));
+ }
+
+ /**
+ * Validates the tenant config for the table
+ */
+ @VisibleForTesting
+ protected void validateTableTenantConfig(TableConfig tableConfig, String tableNameWithType, TableType tableType) {
+ if (tableConfig == null) {
+ throw new InvalidTableConfigException(
+ "Table config is null for table: " + tableNameWithType);
+ }
+ TenantConfig tenantConfig = tableConfig.getTenantConfig();
+ if (tenantConfig == null || tenantConfig.getBroker() == null || tenantConfig.getServer() == null) {
+ throw new InvalidTableConfigException(
+ "Tenant is not configured for table: " + tableNameWithType);
+ }
+ // Check if tenant exists before creating the table
+ String brokerTenantName = TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker());
+ List<String> brokersForTenant = getInstancesWithTag(brokerTenantName);
+ if (brokersForTenant.isEmpty()) {
+ throw new InvalidTableConfigException(
+ "Broker tenant: " + brokerTenantName + " does not exist for table: " + tableNameWithType);
+ }
+ String serverTenantName =
+ TagNameUtils.getTagFromTenantAndServerType(tenantConfig.getServer(), tableType.getServerType());
+ if (getInstancesWithTag(serverTenantName).isEmpty()) {
+ throw new InvalidTableConfigException(
+ "Server tenant: " + serverTenantName + " does not exist for table: " + tableNameWithType);
+ }
+ TagOverrideConfig tagOverrideConfig = tenantConfig.getTagOverrideConfig();
+ if (tagOverrideConfig != null) {
+ String realtimeConsumingTag = tagOverrideConfig.getRealtimeConsuming();
+ if (realtimeConsumingTag != null) {
+ if (!TagNameUtils.hasValidServerTagSuffix(realtimeConsumingTag)) {
+ throw new InvalidTableConfigException(
+ "Invalid realtime consuming tag: " + realtimeConsumingTag + " for table " + tableNameWithType
+ + ". Must have suffix _REALTIME or _OFFLINE");
+ }
+ if (getInstancesWithTag(realtimeConsumingTag).isEmpty()) {
+ throw new InvalidTableConfigException(
+ "No instances found with overridden realtime consuming tag: " + realtimeConsumingTag + " for table: "
+ + tableNameWithType);
+ }
+ }
+
+ String realtimeCompletedTag = tagOverrideConfig.getRealtimeCompleted();
+ if (realtimeCompletedTag != null) {
+ if (!TagNameUtils.hasValidServerTagSuffix(realtimeCompletedTag)) {
+ throw new InvalidTableConfigException(
+ "Invalid realtime completed tag: " + realtimeCompletedTag + " for table " + tableNameWithType
+ + ". Must have suffix _REALTIME or _OFFLINE");
+ }
+ if (getInstancesWithTag(realtimeCompletedTag).isEmpty()) {
+ throw new InvalidTableConfigException(
+ "No instances found with overridden realtime completed tag: " + realtimeCompletedTag + " for table: "
+ + tableNameWithType);
+ }
+ }
+ }
}
/**
@@ -1272,19 +1293,34 @@ public class PinotHelixResourceManager {
}
}
- public void setExistingTableConfig(TableConfig config, String tableNameWithType, TableType type)
+ /**
+ * Validate the table config and update it
+ * @throws IOException
+ */
+ public void updateTableConfig(TableConfig tableConfig, String tableNameWithType, TableType tableType)
throws IOException {
- if (type == TableType.REALTIME) {
- ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, config.toZNRecord());
- ensureRealtimeClusterIsSetUp(config, tableNameWithType, config.getIndexingConfig());
- } else if (type == TableType.OFFLINE) {
+
+ validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ setExistingTableConfig(tableConfig, tableNameWithType, tableType);
+ }
+
+ /**
+ * Sets the given table config into zookeeper
+ */
+ public void setExistingTableConfig(TableConfig tableConfig, String tableNameWithType, TableType tableType)
+ throws IOException {
+
+ if (tableType == TableType.REALTIME) {
+ ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord());
+ ensureRealtimeClusterIsSetUp(tableConfig, tableNameWithType, tableConfig.getIndexingConfig());
+ } else if (tableType == TableType.OFFLINE) {
// Update replica group partition assignment to the property store if applicable
- updateReplicaGroupPartitionAssignment(config);
+ updateReplicaGroupPartitionAssignment(tableConfig);
- ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, config.toZNRecord());
+ ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord());
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
- final String configReplication = config.getValidationConfig().getReplication();
- if (configReplication != null && !config.getValidationConfig().getReplication()
+ final String configReplication = tableConfig.getValidationConfig().getReplication();
+ if (configReplication != null && !tableConfig.getValidationConfig().getReplication()
.equals(idealState.getReplicas())) {
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, new Function<IdealState, IdealState>() {
@Nullable
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 11459fd..573ab90 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -31,12 +31,15 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.config.TagNameUtils;
+import org.apache.pinot.common.config.TagOverrideConfig;
import org.apache.pinot.common.config.Tenant;
+import org.apache.pinot.common.config.TenantConfig;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.TenantRole;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.ControllerConf;
@@ -48,6 +51,8 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.*;
+
public class PinotHelixResourceManagerTest extends ControllerTest {
private static final int BASE_SERVER_ADMIN_PORT = 10000;
@@ -188,7 +193,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
// Create the table
TableConfig tableConfig =
- new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(3)
+ new TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(3)
.setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build();
_helixResourceManager.addTable(tableConfig);
@@ -294,6 +299,165 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
_helixAdmin.removeInstanceTag(_helixClusterName, testServerInstance, "wrong_tag");
}
+ @Test
+ public void testValidateTenantConfigs() {
+ String tableNameWithType = "testTable_OFFLINE";
+ TableType tableType = TableType.OFFLINE;
+ int numReplica = 2;
+ TableConfig tableConfig = null;
+ String brokerTag = "aBrokerTag";
+ String serverTag = "aServerTag";
+
+ // null table config
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ Assert.fail("Expected InvalidTableConfigException");
+ } catch (InvalidTableConfigException e1) {
+ // expected
+ } catch (Exception e2) {
+ Assert.fail("Expected InvalidTableConfigException");
+ }
+
+ // null tenant config
+ tableConfig = new TableConfig.Builder(tableType).setTableName(tableNameWithType).setNumReplicas(numReplica).build();
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ Assert.fail("Expected InvalidTableConfigException");
+ } catch (InvalidTableConfigException e1) {
+ // expected
+ } catch (Exception e2) {
+ Assert.fail("Expected InvalidTableConfigException");
+ }
+
+ // null broker tenant
+ TenantConfig tenantConfig = new TenantConfig();
+ tenantConfig.setServer(serverTag);
+ tableConfig.setTenantConfig(tenantConfig);
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ Assert.fail("Expected InvalidTableConfigException");
+ } catch (InvalidTableConfigException e1) {
+ // expected
+ } catch (Exception e2) {
+ Assert.fail("Expected InvalidTableConfigException");
+ }
+
+ // null server tenant
+ tenantConfig.setServer(null);
+ tenantConfig.setBroker(brokerTag);
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ Assert.fail("Expected InvalidTableConfigException");
+ } catch (InvalidTableConfigException e1) {
+ // expected
+ } catch (Exception e2) {
+ Assert.fail("Expected InvalidTableConfigException");
+ }
+
+ // empty broker instances list
+ tenantConfig.setServer(serverTag);
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ Assert.fail("Expected InvalidTableConfigException");
+ } catch (InvalidTableConfigException e1) {
+ // expected
+ } catch (Exception e2) {
+ Assert.fail("Expected InvalidTableConfigException");
+ }
+
+ // Create broker tenant on 3 Brokers
+ Tenant brokerTenant =
+ new Tenant.TenantBuilder(brokerTag).setRole(TenantRole.BROKER).setTotalInstances(3).build();
+ _helixResourceManager.createBrokerTenant(brokerTenant);
+
+ // empty server instances list
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ Assert.fail("Expected InvalidTableConfigException");
+ } catch (InvalidTableConfigException e1) {
+ // expected
+ } catch (Exception e2) {
+ Assert.fail("Expected InvalidTableConfigException");
+ }
+
+ // valid tenant config, null tagOverrideConfig
+ serverTag = SERVER_TENANT_NAME;
+ tenantConfig.setServer(serverTag);
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ } catch (Exception e2) {
+ Assert.fail("No exceptions expected");
+ }
+
+ // valid tagOverride config
+ TagOverrideConfig tagOverrideConfig = new TagOverrideConfig();
+ tenantConfig.setTagOverrideConfig(tagOverrideConfig);
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ } catch (Exception e2) {
+ Assert.fail("No exceptions expected");
+ }
+
+ // incorrect realtime consuming tag suffix
+ tagOverrideConfig.setRealtimeConsuming("incorrectTag_XXX");
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ Assert.fail("Expected InvalidTableConfigException");
+ } catch (InvalidTableConfigException e1) {
+ // expected
+ } catch (Exception e2) {
+ Assert.fail("Expected InvalidTableConfigException");
+ }
+
+ // incorrect realtime consuming tag suffix
+ tagOverrideConfig.setRealtimeConsuming("correctTagEmptyList_OFFLINE");
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ Assert.fail("Expected InvalidTableConfigException");
+ } catch (InvalidTableConfigException e1) {
+ // expected
+ } catch (Exception e2) {
+ Assert.fail("Expected InvalidTableConfigException");
+ }
+
+ // incorrect realtime completed tag suffix
+ tagOverrideConfig.setRealtimeConsuming(serverTag + "_OFFLINE");
+ tagOverrideConfig.setRealtimeCompleted("incorrectTag_XXX");
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ Assert.fail("Expected InvalidTableConfigException");
+ } catch (InvalidTableConfigException e1) {
+ // expected
+ } catch (Exception e2) {
+ Assert.fail("Expected InvalidTableConfigException");
+ }
+
+ // empty list in realtime completed
+ tagOverrideConfig.setRealtimeCompleted("correctTagEmptyList_OFFLINE");
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ Assert.fail("Expected InvalidTableConfigException");
+ } catch (InvalidTableConfigException e1) {
+ // expected
+ } catch (Exception e2) {
+ Assert.fail("Expected InvalidTableConfigException");
+ }
+
+ // all good
+ tagOverrideConfig.setRealtimeCompleted(serverTag + "_OFFLINE");
+ try {
+ _helixResourceManager.validateTableTenantConfig(tableConfig, tableNameWithType, tableType);
+ } catch (Exception e2) {
+ Assert.fail("No exceptions expected");
+ }
+
+ for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(brokerTag)) {
+ _helixAdmin
+ .removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(brokerTag));
+ _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+ }
+ }
+
@AfterMethod
public void cleanUpBrokerTags() {
// Untag all Brokers for other tests
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index a865aa7..36e67ab 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.common.config.TableTaskConfig;
+import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.utils.KafkaStarterUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.ZkStarter;
@@ -175,6 +176,16 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
return null;
}
+ @Nullable
+ protected String getServerTenant() {
+ return TagNameUtils.DEFAULT_TENANT_NAME;
+ }
+
+ @Nullable
+ protected String getBrokerTenant() {
+ return TagNameUtils.DEFAULT_TENANT_NAME;
+ }
+
/**
* Get the Pinot connection.
*
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 46315f2..419a3cf 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -100,9 +100,10 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
String timeType = outgoingTimeUnit.toString();
addRealtimeTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
- getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, null, null,
- getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(),
- getTaskConfig(), getStreamConsumerFactoryClassName());
+ getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName,
+ getBrokerTenant(), getServerTenant(), getLoadMode(), getSortedColumn(),
+ getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(),
+ getStreamConsumerFactoryClassName());
completeTableConfiguration();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org