You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/03/21 00:28:45 UTC

[incubator-pinot] branch master updated: In TableConfig, add checks for mandatory fields (#3993)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eccf573  In TableConfig, add checks for mandatory fields (#3993)
eccf573 is described below

commit eccf573a636de84e60c85cc331fea0afc172c90c
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Mar 20 17:28:41 2019 -0700

    In TableConfig, add checks for mandatory fields (#3993)
    
    Add explicit checks for mandatory fields when serialize/deserialize table config
    Without the explicit checks, it will throw NPE, which is not clear and hard to debug
    
    Also change the serialize APIs to be non-static
    
    Add unit test and integration test for the changes
---
 .../queryquota/TableQueryQuotaManagerTest.java     |  15 +-
 .../broker/routing/TimeBoundaryServiceTest.java    |   3 +-
 .../HighLevelConsumerRoutingTableBuilderTest.java  |   4 +-
 .../LowLevelConsumerRoutingTableBuilderTest.java   |  12 +-
 .../apache/pinot/common/config/TableConfig.java    | 235 ++++++++------
 .../pinot/common/config/TableConfigTest.java       | 336 ++++++++++++---------
 .../resources/PinotTableConfigRestletResource.java | 140 +++++----
 .../api/resources/PinotTableRestletResource.java   |  12 +-
 .../helix/core/PinotHelixResourceManager.java      |  13 +-
 .../controller/util/AutoAddInvertedIndex.java      |   2 +-
 .../resources/PinotTableRestletResourceTest.java   |  34 +--
 .../resources/PinotTenantRestletResourceTest.java  |   2 +-
 .../helix/ControllerInstanceToggleTest.java        |   2 +-
 .../controller/helix/ControllerSentinelTestV2.java |   2 +-
 .../pinot/hadoop/job/DefaultControllerRestApi.java |   2 +-
 .../pinot/hadoop/job/SegmentCreationJob.java       |   2 +-
 .../pinot/integration/tests/ClusterTest.java       |  11 +-
 .../tests/OfflineClusterIntegrationTest.java       |  18 ++
 .../tools/query/comparison/ClusterStarter.java     |   2 +-
 19 files changed, 481 insertions(+), 366 deletions(-)

diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
index 5ccd243..1b5d709 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
@@ -146,7 +146,7 @@ public class TableQueryQuotaManagerTest {
             .setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
             .setBrokerTenant("testBroker").setServerTenant("testServer").build();
     ZKMetadataProvider
-        .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, TableConfig.toZnRecord(realtimeTableConfig));
+        .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, realtimeTableConfig.toZNRecord());
 
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
@@ -169,7 +169,7 @@ public class TableQueryQuotaManagerTest {
             .setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
             .setBrokerTenant("testBroker").setServerTenant("testServer").build();
     ZKMetadataProvider
-        .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, TableConfig.toZnRecord(realtimeTableConfig));
+        .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, realtimeTableConfig.toZNRecord());
 
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
@@ -205,9 +205,8 @@ public class TableQueryQuotaManagerTest {
             .setBrokerTenant("testBroker").setServerTenant("testServer").build();
 
     ZKMetadataProvider
-        .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, TableConfig.toZnRecord(realtimeTableConfig));
-    ZKMetadataProvider
-        .setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, TableConfig.toZnRecord(offlineTableConfig));
+        .setRealtimeTableConfig(_testPropertyStore, REALTIME_TABLE_NAME, realtimeTableConfig.toZNRecord());
+    ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, offlineTableConfig.toZNRecord());
 
     // Since each table has 2 online brokers, per broker rate becomes 100.0 / 2 = 50.0
     _tableQueryQuotaManager.initTableQueryQuota(offlineTableConfig, brokerResource);
@@ -261,8 +260,7 @@ public class TableQueryQuotaManagerTest {
         new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
             .setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
             .setBrokerTenant("testBroker").setServerTenant("testServer").build();
-    ZKMetadataProvider
-        .setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, TableConfig.toZnRecord(offlineTableConfig));
+    ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, offlineTableConfig.toZNRecord());
 
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
@@ -280,8 +278,7 @@ public class TableQueryQuotaManagerTest {
         new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setQuotaConfig(quotaConfig)
             .setRetentionTimeUnit("DAYS").setRetentionTimeValue("1").setSegmentPushType("APPEND")
             .setBrokerTenant("testBroker").setServerTenant("testServer").build();
-    ZKMetadataProvider
-        .setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, TableConfig.toZnRecord(offlineTableConfig));
+    ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, offlineTableConfig.toZNRecord());
 
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/TimeBoundaryServiceTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/TimeBoundaryServiceTest.java
index 6058723..31e6839 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/TimeBoundaryServiceTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/TimeBoundaryServiceTest.java
@@ -125,7 +125,6 @@ public class TimeBoundaryServiceTest {
       throws Exception {
     TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(tableName)
         .setTimeColumnName("timestamp").setTimeType("DAYS").build();
-    ZKMetadataProvider
-        .setOfflineTableConfig(_propertyStore, tableConfig.getTableName(), TableConfig.toZnRecord(tableConfig));
+    ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableConfig.getTableName(), tableConfig.toZNRecord());
   }
 }
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
index 967be45..225f3f8 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
@@ -49,8 +49,8 @@ public class HighLevelConsumerRoutingTableBuilderTest {
 
     Random random = new Random();
 
-    TableConfig tableConfig = new TableConfig();
-    tableConfig.setTableName("tableName");
+    TableConfig tableConfig =
+        new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName("tableName").build();
     HighLevelConsumerBasedRoutingTableBuilder routingTableBuilder = new HighLevelConsumerBasedRoutingTableBuilder();
     routingTableBuilder.init(new BaseConfiguration(), tableConfig, null, null);
 
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java
index 92da770..23be4be 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java
@@ -52,8 +52,8 @@ public class LowLevelConsumerRoutingTableBuilderTest {
     final int ITERATIONS = 50;
     Random random = new Random();
 
-    TableConfig tableConfig = new TableConfig();
-    tableConfig.setTableName("tableName");
+    TableConfig tableConfig =
+        new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName("tableName").build();
     LowLevelConsumerRoutingTableBuilder routingTableBuilder = new LowLevelConsumerRoutingTableBuilder();
     routingTableBuilder.init(new BaseConfiguration(), tableConfig, null, null);
 
@@ -161,8 +161,8 @@ public class LowLevelConsumerRoutingTableBuilderTest {
     final int ONLINE_SEGMENT_COUNT = 8;
     final int CONSUMING_SEGMENT_COUNT = SEGMENT_COUNT - ONLINE_SEGMENT_COUNT;
 
-    TableConfig tableConfig = new TableConfig();
-    tableConfig.setTableName("tableName");
+    TableConfig tableConfig =
+        new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName("tableName").build();
     LowLevelConsumerRoutingTableBuilder routingTableBuilder = new LowLevelConsumerRoutingTableBuilder();
     routingTableBuilder.init(new BaseConfiguration(), tableConfig, null, null);
 
@@ -207,8 +207,8 @@ public class LowLevelConsumerRoutingTableBuilderTest {
     final int SEGMENT_COUNT = 10;
     final int ONLINE_SEGMENT_COUNT = 8;
 
-    TableConfig tableConfig = new TableConfig();
-    tableConfig.setTableName("tableName");
+    TableConfig tableConfig =
+        new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName("tableName").build();
     LowLevelConsumerRoutingTableBuilder routingTableBuilder = new LowLevelConsumerRoutingTableBuilder();
     routingTableBuilder.init(new BaseConfiguration(), tableConfig, null, null);
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java
index 3779d2c..2f4ee1c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/TableConfig.java
@@ -27,7 +27,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.data.StarTreeIndexSpec;
@@ -51,6 +50,8 @@ public class TableConfig {
   public static final String TASK_CONFIG_KEY = "task";
   public static final String ROUTING_CONFIG_KEY = "routing";
 
+  private static final String FIELD_MISSING_MESSAGE_TEMPLATE = "Mandatory field '%s' is missing";
+
   @ConfigKey("name")
   @ConfigDoc(value = "The name for the table.", mandatory = true, exampleValue = "myTable")
   private String _tableName;
@@ -81,15 +82,17 @@ public class TableConfig {
   @NestedConfig
   private RoutingConfig _routingConfig;
 
+  /**
+   * NOTE: DO NOT use this constructor, use builder instead. This constructor is for deserializer only.
+   */
   public TableConfig() {
     // TODO: currently these 2 fields are annotated as non-null. Revisit to see whether that's necessary
     _tenantConfig = new TenantConfig();
     _customConfig = new TableCustomConfig();
   }
 
-  private TableConfig(@Nonnull String tableName, @Nonnull TableType tableType,
-      @Nonnull SegmentsValidationAndRetentionConfig validationConfig, @Nonnull TenantConfig tenantConfig,
-      @Nonnull IndexingConfig indexingConfig, @Nonnull TableCustomConfig customConfig,
+  private TableConfig(String tableName, TableType tableType, SegmentsValidationAndRetentionConfig validationConfig,
+      TenantConfig tenantConfig, IndexingConfig indexingConfig, TableCustomConfig customConfig,
       @Nullable QuotaConfig quotaConfig, @Nullable TableTaskConfig taskConfig, @Nullable RoutingConfig routingConfig) {
     _tableName = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
     _tableType = tableType;
@@ -102,56 +105,63 @@ public class TableConfig {
     _routingConfig = routingConfig;
   }
 
-  // For backward compatible
-  @Deprecated
-  @Nonnull
-  public static TableConfig init(@Nonnull String jsonConfigString)
-      throws IOException {
-    return fromJsonString(jsonConfigString);
-  }
-
   public static TableConfig fromJsonString(String jsonString)
       throws IOException {
-    return fromJSONConfig(JsonUtils.stringToJsonNode(jsonString));
+    return fromJsonConfig(JsonUtils.stringToJsonNode(jsonString));
   }
 
-  @Nonnull
-  public static TableConfig fromJSONConfig(@Nonnull JsonNode jsonConfig)
+  public static TableConfig fromJsonConfig(JsonNode jsonConfig)
       throws IOException {
-    TableType tableType = TableType.valueOf(jsonConfig.get(TABLE_TYPE_KEY).asText().toUpperCase());
-    String tableName = TableNameBuilder.forType(tableType).tableNameWithType(jsonConfig.get(TABLE_NAME_KEY).asText());
+    // Mandatory fields
+    JsonNode jsonTableType = jsonConfig.get(TABLE_TYPE_KEY);
+    Preconditions
+        .checkState(jsonTableType != null && !jsonTableType.isNull(), FIELD_MISSING_MESSAGE_TEMPLATE, TABLE_TYPE_KEY);
+    TableType tableType = TableType.valueOf(jsonTableType.asText().toUpperCase());
+
+    JsonNode jsonTableName = jsonConfig.get(TABLE_NAME_KEY);
+    Preconditions
+        .checkState(jsonTableName != null && !jsonTableName.isNull(), FIELD_MISSING_MESSAGE_TEMPLATE, TABLE_NAME_KEY);
+    String tableName = TableNameBuilder.forType(tableType).tableNameWithType(jsonTableName.asText());
 
     SegmentsValidationAndRetentionConfig validationConfig =
         extractChildConfig(jsonConfig, VALIDATION_CONFIG_KEY, SegmentsValidationAndRetentionConfig.class);
+    Preconditions.checkState(validationConfig != null, FIELD_MISSING_MESSAGE_TEMPLATE, VALIDATION_CONFIG_KEY);
+
     TenantConfig tenantConfig = extractChildConfig(jsonConfig, TENANT_CONFIG_KEY, TenantConfig.class);
+    Preconditions.checkState(tenantConfig != null, FIELD_MISSING_MESSAGE_TEMPLATE, TENANT_CONFIG_KEY);
+
     IndexingConfig indexingConfig = extractChildConfig(jsonConfig, INDEXING_CONFIG_KEY, IndexingConfig.class);
+    Preconditions.checkState(indexingConfig != null, FIELD_MISSING_MESSAGE_TEMPLATE, INDEXING_CONFIG_KEY);
+
     TableCustomConfig customConfig = extractChildConfig(jsonConfig, CUSTOM_CONFIG_KEY, TableCustomConfig.class);
-    QuotaConfig quotaConfig = null;
-    if (jsonConfig.has(QUOTA_CONFIG_KEY)) {
-      quotaConfig = extractChildConfig(jsonConfig, QUOTA_CONFIG_KEY, QuotaConfig.class);
+    Preconditions.checkState(customConfig != null, FIELD_MISSING_MESSAGE_TEMPLATE, CUSTOM_CONFIG_KEY);
+
+    // Optional fields
+    QuotaConfig quotaConfig = extractChildConfig(jsonConfig, QUOTA_CONFIG_KEY, QuotaConfig.class);
+    if (quotaConfig != null) {
       quotaConfig.validate();
     }
-    TableTaskConfig taskConfig = null;
-    if (jsonConfig.has(TASK_CONFIG_KEY)) {
-      taskConfig = extractChildConfig(jsonConfig, TASK_CONFIG_KEY, TableTaskConfig.class);
-    }
-    RoutingConfig routingConfig = null;
-    if (jsonConfig.has(ROUTING_CONFIG_KEY)) {
-      routingConfig = extractChildConfig(jsonConfig, ROUTING_CONFIG_KEY, RoutingConfig.class);
-    }
+
+    TableTaskConfig taskConfig = extractChildConfig(jsonConfig, TASK_CONFIG_KEY, TableTaskConfig.class);
+
+    RoutingConfig routingConfig = extractChildConfig(jsonConfig, ROUTING_CONFIG_KEY, RoutingConfig.class);
 
     return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
         quotaConfig, taskConfig, routingConfig);
   }
 
   /**
-   * Extracts the child config from the table config.
+   * Extracts the child config from the table config. Returns {@code null} if child config does not exist.
    * <p>
    * NOTE: for historical reason, we support two kinds of nested config values: normal json and serialized json string
    */
+  @Nullable
   private static <T> T extractChildConfig(JsonNode jsonConfig, String childConfigKey, Class<T> childConfigClass)
       throws IOException {
     JsonNode childConfigNode = jsonConfig.get(childConfigKey);
+    if (childConfigNode == null || childConfigNode.isNull()) {
+      return null;
+    }
     if (childConfigNode.isObject()) {
       return JsonUtils.jsonNodeToObject(childConfigNode, childConfigClass);
     } else {
@@ -159,54 +169,83 @@ public class TableConfig {
     }
   }
 
-  @Nonnull
-  public static JsonNode toJSONConfig(@Nonnull TableConfig tableConfig) {
+  public ObjectNode toJsonConfig() {
+    validate();
+
     ObjectNode jsonConfig = JsonUtils.newObjectNode();
-    jsonConfig.put(TABLE_NAME_KEY, tableConfig._tableName);
-    jsonConfig.put(TABLE_TYPE_KEY, tableConfig._tableType.toString());
-    jsonConfig.set(VALIDATION_CONFIG_KEY, JsonUtils.objectToJsonNode(tableConfig._validationConfig));
-    jsonConfig.set(TENANT_CONFIG_KEY, JsonUtils.objectToJsonNode(tableConfig._tenantConfig));
-    jsonConfig.set(INDEXING_CONFIG_KEY, JsonUtils.objectToJsonNode(tableConfig._indexingConfig));
-    jsonConfig.set(CUSTOM_CONFIG_KEY, JsonUtils.objectToJsonNode(tableConfig._customConfig));
-    if (tableConfig._quotaConfig != null) {
-      jsonConfig.set(QUOTA_CONFIG_KEY, JsonUtils.objectToJsonNode(tableConfig._quotaConfig));
+
+    // Mandatory fields
+    jsonConfig.put(TABLE_NAME_KEY, _tableName);
+    jsonConfig.put(TABLE_TYPE_KEY, _tableType.toString());
+    jsonConfig.set(VALIDATION_CONFIG_KEY, JsonUtils.objectToJsonNode(_validationConfig));
+    jsonConfig.set(TENANT_CONFIG_KEY, JsonUtils.objectToJsonNode(_tenantConfig));
+    jsonConfig.set(INDEXING_CONFIG_KEY, JsonUtils.objectToJsonNode(_indexingConfig));
+    jsonConfig.set(CUSTOM_CONFIG_KEY, JsonUtils.objectToJsonNode(_customConfig));
+
+    // Optional fields
+    if (_quotaConfig != null) {
+      jsonConfig.set(QUOTA_CONFIG_KEY, JsonUtils.objectToJsonNode(_quotaConfig));
     }
-    if (tableConfig._taskConfig != null) {
-      jsonConfig.set(TASK_CONFIG_KEY, JsonUtils.objectToJsonNode(tableConfig._taskConfig));
+    if (_taskConfig != null) {
+      jsonConfig.set(TASK_CONFIG_KEY, JsonUtils.objectToJsonNode(_taskConfig));
     }
-    if (tableConfig._routingConfig != null) {
-      jsonConfig.set(ROUTING_CONFIG_KEY, JsonUtils.objectToJsonNode(tableConfig._routingConfig));
+    if (_routingConfig != null) {
+      jsonConfig.set(ROUTING_CONFIG_KEY, JsonUtils.objectToJsonNode(_routingConfig));
     }
+
     return jsonConfig;
   }
 
-  @Nonnull
-  public static TableConfig fromZnRecord(@Nonnull ZNRecord znRecord)
+  public String toJsonConfigString() {
+    return toJsonConfig().toString();
+  }
+
+  public static TableConfig fromZnRecord(ZNRecord znRecord)
       throws IOException {
     Map<String, String> simpleFields = znRecord.getSimpleFields();
-    TableType tableType = TableType.valueOf(simpleFields.get(TABLE_TYPE_KEY).toUpperCase());
-    String tableName = TableNameBuilder.forType(tableType).tableNameWithType(simpleFields.get(TABLE_NAME_KEY));
+
+    // Mandatory fields
+    String tableTypeString = simpleFields.get(TABLE_TYPE_KEY);
+    Preconditions.checkState(tableTypeString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TABLE_TYPE_KEY);
+    TableType tableType = TableType.valueOf(tableTypeString.toUpperCase());
+
+    String tableNameString = simpleFields.get(TABLE_NAME_KEY);
+    Preconditions.checkState(tableNameString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TABLE_NAME_KEY);
+    String tableName = TableNameBuilder.forType(tableType).tableNameWithType(tableNameString);
+
+    String validationConfigString = simpleFields.get(VALIDATION_CONFIG_KEY);
+    Preconditions.checkState(validationConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, VALIDATION_CONFIG_KEY);
     SegmentsValidationAndRetentionConfig validationConfig =
-        JsonUtils.stringToObject(simpleFields.get(VALIDATION_CONFIG_KEY), SegmentsValidationAndRetentionConfig.class);
-    TenantConfig tenantConfig = JsonUtils.stringToObject(simpleFields.get(TENANT_CONFIG_KEY), TenantConfig.class);
-    IndexingConfig indexingConfig =
-        JsonUtils.stringToObject(simpleFields.get(INDEXING_CONFIG_KEY), IndexingConfig.class);
-    TableCustomConfig customConfig =
-        JsonUtils.stringToObject(simpleFields.get(CUSTOM_CONFIG_KEY), TableCustomConfig.class);
+        JsonUtils.stringToObject(validationConfigString, SegmentsValidationAndRetentionConfig.class);
+
+    String tenantConfigString = simpleFields.get(TENANT_CONFIG_KEY);
+    Preconditions.checkState(tenantConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TENANT_CONFIG_KEY);
+    TenantConfig tenantConfig = JsonUtils.stringToObject(tenantConfigString, TenantConfig.class);
+
+    String indexingConfigString = simpleFields.get(INDEXING_CONFIG_KEY);
+    Preconditions.checkState(indexingConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, INDEXING_CONFIG_KEY);
+    IndexingConfig indexingConfig = JsonUtils.stringToObject(indexingConfigString, IndexingConfig.class);
+
+    String customConfigString = simpleFields.get(CUSTOM_CONFIG_KEY);
+    Preconditions.checkState(customConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, CUSTOM_CONFIG_KEY);
+    TableCustomConfig customConfig = JsonUtils.stringToObject(customConfigString, TableCustomConfig.class);
+
+    // Optional fields
     QuotaConfig quotaConfig = null;
     String quotaConfigString = simpleFields.get(QUOTA_CONFIG_KEY);
     if (quotaConfigString != null) {
       quotaConfig = JsonUtils.stringToObject(quotaConfigString, QuotaConfig.class);
       quotaConfig.validate();
     }
+
     TableTaskConfig taskConfig = null;
     String taskConfigString = simpleFields.get(TASK_CONFIG_KEY);
     if (taskConfigString != null) {
       taskConfig = JsonUtils.stringToObject(taskConfigString, TableTaskConfig.class);
     }
-    String routingConfigString = simpleFields.get(ROUTING_CONFIG_KEY);
 
     RoutingConfig routingConfig = null;
+    String routingConfigString = simpleFields.get(ROUTING_CONFIG_KEY);
     if (routingConfigString != null) {
       routingConfig = JsonUtils.stringToObject(routingConfigString, RoutingConfig.class);
     }
@@ -215,84 +254,94 @@ public class TableConfig {
         quotaConfig, taskConfig, routingConfig);
   }
 
-  @Nonnull
-  public static ZNRecord toZnRecord(@Nonnull TableConfig tableConfig) {
-    ZNRecord znRecord = new ZNRecord(tableConfig.getTableName());
+  public ZNRecord toZNRecord()
+      throws JsonProcessingException {
+    validate();
+
     Map<String, String> simpleFields = new HashMap<>();
-    simpleFields.put(TABLE_NAME_KEY, tableConfig._tableName);
-    simpleFields.put(TABLE_TYPE_KEY, tableConfig._tableType.toString());
-    try {
-      simpleFields.put(VALIDATION_CONFIG_KEY, JsonUtils.objectToString(tableConfig._validationConfig));
-      simpleFields.put(TENANT_CONFIG_KEY, JsonUtils.objectToString(tableConfig._tenantConfig));
-      simpleFields.put(INDEXING_CONFIG_KEY, JsonUtils.objectToString(tableConfig._indexingConfig));
-      simpleFields.put(CUSTOM_CONFIG_KEY, JsonUtils.objectToString(tableConfig._customConfig));
-      if (tableConfig._quotaConfig != null) {
-        simpleFields.put(QUOTA_CONFIG_KEY, JsonUtils.objectToString(tableConfig._quotaConfig));
-      }
-      if (tableConfig._taskConfig != null) {
-        simpleFields.put(TASK_CONFIG_KEY, JsonUtils.objectToString(tableConfig._taskConfig));
-      }
-      if (tableConfig._routingConfig != null) {
-        simpleFields.put(ROUTING_CONFIG_KEY, JsonUtils.objectToString(tableConfig._routingConfig));
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+
+    // Mandatory fields
+    simpleFields.put(TABLE_NAME_KEY, _tableName);
+    simpleFields.put(TABLE_TYPE_KEY, _tableType.toString());
+    simpleFields.put(VALIDATION_CONFIG_KEY, JsonUtils.objectToString(_validationConfig));
+    simpleFields.put(TENANT_CONFIG_KEY, JsonUtils.objectToString(_tenantConfig));
+    simpleFields.put(INDEXING_CONFIG_KEY, JsonUtils.objectToString(_indexingConfig));
+    simpleFields.put(CUSTOM_CONFIG_KEY, JsonUtils.objectToString(_customConfig));
+
+    // Optional fields
+    if (_quotaConfig != null) {
+      simpleFields.put(QUOTA_CONFIG_KEY, JsonUtils.objectToString(_quotaConfig));
+    }
+    if (_taskConfig != null) {
+      simpleFields.put(TASK_CONFIG_KEY, JsonUtils.objectToString(_taskConfig));
     }
+    if (_routingConfig != null) {
+      simpleFields.put(ROUTING_CONFIG_KEY, JsonUtils.objectToString(_routingConfig));
+    }
+
+    ZNRecord znRecord = new ZNRecord(_tableName);
     znRecord.setSimpleFields(simpleFields);
     return znRecord;
   }
 
-  @Nonnull
+  /**
+   * Validates the table config.
+   * TODO: revisit to see whether all the following fields are mandatory
+   */
+  public void validate() {
+    Preconditions.checkState(_tableName != null, "Table name is missing");
+    Preconditions.checkState(_tableType != null, "Table type is missing");
+    Preconditions.checkState(_validationConfig != null, "Validation config is missing");
+    Preconditions.checkState(_tenantConfig != null, "Tenant config is missing");
+    Preconditions.checkState(_indexingConfig != null, "Indexing config is missing");
+    Preconditions.checkState(_customConfig != null, "Custom config is missing");
+  }
+
   public String getTableName() {
     return _tableName;
   }
 
-  public void setTableName(@Nonnull String tableName) {
+  public void setTableName(String tableName) {
     _tableName = tableName;
   }
 
-  @Nonnull
   public TableType getTableType() {
     return _tableType;
   }
 
-  public void setTableType(@Nonnull TableType tableType) {
+  public void setTableType(TableType tableType) {
     _tableType = tableType;
   }
 
-  @Nonnull
   public SegmentsValidationAndRetentionConfig getValidationConfig() {
     return _validationConfig;
   }
 
-  public void setValidationConfig(@Nonnull SegmentsValidationAndRetentionConfig validationConfig) {
+  public void setValidationConfig(SegmentsValidationAndRetentionConfig validationConfig) {
     _validationConfig = validationConfig;
   }
 
-  @Nonnull
   public TenantConfig getTenantConfig() {
     return _tenantConfig;
   }
 
-  public void setTenantConfig(@Nonnull TenantConfig tenantConfig) {
+  public void setTenantConfig(TenantConfig tenantConfig) {
     _tenantConfig = tenantConfig;
   }
 
-  @Nonnull
   public IndexingConfig getIndexingConfig() {
     return _indexingConfig;
   }
 
-  public void setIndexingConfig(@Nonnull IndexingConfig indexingConfig) {
+  public void setIndexingConfig(IndexingConfig indexingConfig) {
     _indexingConfig = indexingConfig;
   }
 
-  @Nonnull
   public TableCustomConfig getCustomConfig() {
     return _customConfig;
   }
 
-  public void setCustomConfig(@Nonnull TableCustomConfig customConfig) {
+  public void setCustomConfig(TableCustomConfig customConfig) {
     _customConfig = customConfig;
   }
 
@@ -301,7 +350,7 @@ public class TableConfig {
     return _quotaConfig;
   }
 
-  public void setQuotaConfig(@Nullable QuotaConfig quotaConfig) {
+  public void setQuotaConfig(QuotaConfig quotaConfig) {
     _quotaConfig = quotaConfig;
   }
 
@@ -310,7 +359,7 @@ public class TableConfig {
     return _taskConfig;
   }
 
-  public void setTaskConfig(@Nullable TableTaskConfig taskConfig) {
+  public void setTaskConfig(TableTaskConfig taskConfig) {
     _taskConfig = taskConfig;
   }
 
@@ -323,16 +372,10 @@ public class TableConfig {
     _routingConfig = routingConfig;
   }
 
-  @Nonnull
-  public String toJSONConfigString()
-      throws IOException {
-    return toJSONConfig(this).toString();
-  }
-
   @Override
   public String toString() {
     try {
-      return JsonUtils.objectToPrettyString(toJSONConfig(this));
+      return JsonUtils.objectToPrettyString(toJsonConfig());
     } catch (JsonProcessingException e) {
       throw new RuntimeException(e);
     }
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java b/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
index 1d5e571..3821094 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
@@ -18,22 +18,96 @@
  */
 package org.apache.pinot.common.config;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
-import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.data.StarTreeIndexSpec;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.startree.hll.HllConfig;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.*;
+
 
 public class TableConfigTest {
 
   @Test
+  public void testSerializeMandatoryFields()
+      throws Exception {
+    TableConfig tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName("myTable").build();
+    tableConfig.setTableName(null);
+    testSerializeMandatoryFields(tableConfig, "Table name");
+
+    tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName("myTable").build();
+    tableConfig.setTableType(null);
+    testSerializeMandatoryFields(tableConfig, "Table type");
+
+    tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName("myTable").build();
+    tableConfig.setValidationConfig(null);
+    testSerializeMandatoryFields(tableConfig, "Validation config");
+
+    tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName("myTable").build();
+    tableConfig.setTenantConfig(null);
+    testSerializeMandatoryFields(tableConfig, "Tenant config");
+
+    tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName("myTable").build();
+    tableConfig.setIndexingConfig(null);
+    testSerializeMandatoryFields(tableConfig, "Indexing config");
+
+    tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName("myTable").build();
+    tableConfig.setCustomConfig(null);
+    testSerializeMandatoryFields(tableConfig, "Custom config");
+  }
+
+  private void testSerializeMandatoryFields(TableConfig tableConfig, String expectedMessage)
+      throws Exception {
+    try {
+      tableConfig.toJsonConfig();
+      fail();
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains(expectedMessage));
+    }
+    try {
+      tableConfig.toZNRecord();
+      fail();
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains(expectedMessage));
+    }
+  }
+
+  @Test
+  public void testDeserializeMandatoryFields()
+      throws Exception {
+    TableConfig tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName("myTable").build();
+    ObjectNode jsonTableConfig = tableConfig.toJsonConfig();
+    TableConfig.fromJsonConfig(jsonTableConfig);
+
+    testDeserializeMandatoryFields(jsonTableConfig.deepCopy(), TableConfig.TABLE_TYPE_KEY);
+
+    testDeserializeMandatoryFields(jsonTableConfig.deepCopy(), TableConfig.TABLE_NAME_KEY);
+
+    testDeserializeMandatoryFields(jsonTableConfig.deepCopy(), TableConfig.VALIDATION_CONFIG_KEY);
+
+    testDeserializeMandatoryFields(jsonTableConfig.deepCopy(), TableConfig.TENANT_CONFIG_KEY);
+
+    testDeserializeMandatoryFields(jsonTableConfig.deepCopy(), TableConfig.INDEXING_CONFIG_KEY);
+
+    testDeserializeMandatoryFields(jsonTableConfig.deepCopy(), TableConfig.CUSTOM_CONFIG_KEY);
+  }
+
+  private void testDeserializeMandatoryFields(ObjectNode jsonTableConfig, String mandatoryFieldKey)
+      throws Exception {
+    jsonTableConfig.remove(mandatoryFieldKey);
+    try {
+      TableConfig.fromJsonConfig(jsonTableConfig);
+      fail();
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains(mandatoryFieldKey));
+    }
+  }
+
+  @Test
   public void testSerializeDeserialize()
       throws Exception {
     TableConfig.Builder tableConfigBuilder = new TableConfig.Builder(TableType.OFFLINE).setTableName("myTable");
@@ -41,32 +115,31 @@ public class TableConfigTest {
       // No quota config
       TableConfig tableConfig = tableConfigBuilder.build();
 
-      Assert.assertEquals(tableConfig.getTableName(), "myTable_OFFLINE");
-      Assert.assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
-      Assert.assertEquals(tableConfig.getIndexingConfig().getLoadMode(), "HEAP");
-      Assert.assertNull(tableConfig.getQuotaConfig());
+      assertEquals(tableConfig.getTableName(), "myTable_OFFLINE");
+      assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
+      assertEquals(tableConfig.getIndexingConfig().getLoadMode(), "HEAP");
+      assertNull(tableConfig.getQuotaConfig());
 
       // Serialize
-      JsonNode jsonTableConfig = TableConfig.toJSONConfig(tableConfig);
+      ObjectNode jsonTableConfig = tableConfig.toJsonConfig();
       // All nested configs should be json objects instead of serialized strings
-      Assert.assertTrue(jsonTableConfig.get(TableConfig.VALIDATION_CONFIG_KEY) instanceof ObjectNode);
-      Assert.assertTrue(jsonTableConfig.get(TableConfig.TENANT_CONFIG_KEY) instanceof ObjectNode);
-      Assert.assertTrue(jsonTableConfig.get(TableConfig.INDEXING_CONFIG_KEY) instanceof ObjectNode);
-      Assert.assertTrue(jsonTableConfig.get(TableConfig.CUSTOM_CONFIG_KEY) instanceof ObjectNode);
+      assertTrue(jsonTableConfig.get(TableConfig.VALIDATION_CONFIG_KEY) instanceof ObjectNode);
+      assertTrue(jsonTableConfig.get(TableConfig.TENANT_CONFIG_KEY) instanceof ObjectNode);
+      assertTrue(jsonTableConfig.get(TableConfig.INDEXING_CONFIG_KEY) instanceof ObjectNode);
+      assertTrue(jsonTableConfig.get(TableConfig.CUSTOM_CONFIG_KEY) instanceof ObjectNode);
 
       // De-serialize
-      TableConfig tableConfigToCompare = TableConfig.fromJSONConfig(jsonTableConfig);
-      Assert.assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
-      Assert.assertNull(tableConfigToCompare.getQuotaConfig());
-      Assert.assertNull(tableConfigToCompare.getValidationConfig().getReplicaGroupStrategyConfig());
-      Assert.assertNull(tableConfigToCompare.getValidationConfig().getHllConfig());
-
-      ZNRecord znRecord = TableConfig.toZnRecord(tableConfig);
-      tableConfigToCompare = TableConfig.fromZnRecord(znRecord);
-      Assert.assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
-      Assert.assertNull(tableConfigToCompare.getQuotaConfig());
-      Assert.assertNull(tableConfig.getValidationConfig().getReplicaGroupStrategyConfig());
-      Assert.assertNull(tableConfigToCompare.getValidationConfig().getHllConfig());
+      TableConfig tableConfigToCompare = TableConfig.fromJsonConfig(jsonTableConfig);
+      assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+      assertNull(tableConfigToCompare.getQuotaConfig());
+      assertNull(tableConfigToCompare.getValidationConfig().getReplicaGroupStrategyConfig());
+      assertNull(tableConfigToCompare.getValidationConfig().getHllConfig());
+
+      tableConfigToCompare = TableConfig.fromZnRecord(tableConfig.toZNRecord());
+      assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+      assertNull(tableConfigToCompare.getQuotaConfig());
+      assertNull(tableConfig.getValidationConfig().getReplicaGroupStrategyConfig());
+      assertNull(tableConfigToCompare.getValidationConfig().getHllConfig());
     }
     {
       // With quota config
@@ -74,103 +147,90 @@ public class TableConfigTest {
       quotaConfig.setStorage("30G");
       TableConfig tableConfig = tableConfigBuilder.setQuotaConfig(quotaConfig).build();
 
-      Assert.assertEquals(tableConfig.getTableName(), "myTable_OFFLINE");
-      Assert.assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
-      Assert.assertEquals(tableConfig.getIndexingConfig().getLoadMode(), "HEAP");
-      Assert.assertNotNull(tableConfig.getQuotaConfig());
-      Assert.assertEquals(tableConfig.getQuotaConfig().getStorage(), "30G");
-      Assert.assertNull(tableConfig.getQuotaConfig().getMaxQueriesPerSecond());
+      assertEquals(tableConfig.getTableName(), "myTable_OFFLINE");
+      assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
+      assertEquals(tableConfig.getIndexingConfig().getLoadMode(), "HEAP");
+      assertNotNull(tableConfig.getQuotaConfig());
+      assertEquals(tableConfig.getQuotaConfig().getStorage(), "30G");
+      assertNull(tableConfig.getQuotaConfig().getMaxQueriesPerSecond());
 
       // With qps quota
       quotaConfig.setMaxQueriesPerSecond("100.00");
       tableConfig = tableConfigBuilder.setQuotaConfig(quotaConfig).build();
-      Assert.assertNotNull(tableConfig.getQuotaConfig());
-      Assert.assertNotNull(tableConfig.getQuotaConfig().getMaxQueriesPerSecond());
-      Assert.assertEquals(tableConfig.getQuotaConfig().getMaxQueriesPerSecond(), "100.00");
+      assertNotNull(tableConfig.getQuotaConfig());
+      assertNotNull(tableConfig.getQuotaConfig().getMaxQueriesPerSecond());
+      assertEquals(tableConfig.getQuotaConfig().getMaxQueriesPerSecond(), "100.00");
 
       // Serialize then de-serialize
-      TableConfig tableConfigToCompare = TableConfig.fromJSONConfig(TableConfig.toJSONConfig(tableConfig));
-      Assert.assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
-      Assert.assertNotNull(tableConfigToCompare.getQuotaConfig());
-      Assert
-          .assertEquals(tableConfigToCompare.getQuotaConfig().getStorage(), tableConfig.getQuotaConfig().getStorage());
-
-      ZNRecord znRecord = TableConfig.toZnRecord(tableConfig);
-      tableConfigToCompare = TableConfig.fromZnRecord(znRecord);
-      Assert.assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
-      Assert.assertNotNull(tableConfigToCompare.getQuotaConfig());
-      Assert
-          .assertEquals(tableConfigToCompare.getQuotaConfig().getStorage(), tableConfig.getQuotaConfig().getStorage());
+      TableConfig tableConfigToCompare = TableConfig.fromJsonConfig(tableConfig.toJsonConfig());
+      assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+      assertNotNull(tableConfigToCompare.getQuotaConfig());
+      assertEquals(tableConfigToCompare.getQuotaConfig().getStorage(), tableConfig.getQuotaConfig().getStorage());
+
+      tableConfigToCompare = TableConfig.fromZnRecord(tableConfig.toZNRecord());
+      assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+      assertNotNull(tableConfigToCompare.getQuotaConfig());
+      assertEquals(tableConfigToCompare.getQuotaConfig().getStorage(), tableConfig.getQuotaConfig().getStorage());
     }
     {
       // With tenant config
       TableConfig tableConfig =
           tableConfigBuilder.setServerTenant("aServerTenant").setBrokerTenant("aBrokerTenant").build();
 
-      Assert.assertEquals(tableConfig.getTableName(), "myTable_OFFLINE");
-      Assert.assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
-      Assert.assertEquals(tableConfig.getIndexingConfig().getLoadMode(), "HEAP");
-      Assert.assertNotNull(tableConfig.getTenantConfig());
-      Assert.assertEquals(tableConfig.getTenantConfig().getServer(), "aServerTenant");
-      Assert.assertEquals(tableConfig.getTenantConfig().getBroker(), "aBrokerTenant");
-      Assert.assertNull(tableConfig.getTenantConfig().getTagOverrideConfig());
+      assertEquals(tableConfig.getTableName(), "myTable_OFFLINE");
+      assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
+      assertEquals(tableConfig.getIndexingConfig().getLoadMode(), "HEAP");
+      assertNotNull(tableConfig.getTenantConfig());
+      assertEquals(tableConfig.getTenantConfig().getServer(), "aServerTenant");
+      assertEquals(tableConfig.getTenantConfig().getBroker(), "aBrokerTenant");
+      assertNull(tableConfig.getTenantConfig().getTagOverrideConfig());
 
       // Serialize then de-serialize
-      TableConfig tableConfigToCompare = TableConfig.fromJSONConfig(TableConfig.toJSONConfig(tableConfig));
-      Assert.assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
-      Assert.assertNotNull(tableConfigToCompare.getTenantConfig());
-      Assert
-          .assertEquals(tableConfigToCompare.getTenantConfig().getServer(), tableConfig.getTenantConfig().getServer());
-      Assert
-          .assertEquals(tableConfigToCompare.getTenantConfig().getBroker(), tableConfig.getTenantConfig().getBroker());
-      Assert.assertNull(tableConfig.getTenantConfig().getTagOverrideConfig());
-
-      ZNRecord znRecord = TableConfig.toZnRecord(tableConfig);
-      tableConfigToCompare = TableConfig.fromZnRecord(znRecord);
-      Assert.assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
-      Assert.assertNotNull(tableConfigToCompare.getTenantConfig());
-      Assert
-          .assertEquals(tableConfigToCompare.getTenantConfig().getServer(), tableConfig.getTenantConfig().getServer());
-      Assert
-          .assertEquals(tableConfigToCompare.getTenantConfig().getBroker(), tableConfig.getTenantConfig().getBroker());
-      Assert.assertNull(tableConfig.getTenantConfig().getTagOverrideConfig());
+      TableConfig tableConfigToCompare = TableConfig.fromJsonConfig(tableConfig.toJsonConfig());
+      assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+      assertNotNull(tableConfigToCompare.getTenantConfig());
+      assertEquals(tableConfigToCompare.getTenantConfig().getServer(), tableConfig.getTenantConfig().getServer());
+      assertEquals(tableConfigToCompare.getTenantConfig().getBroker(), tableConfig.getTenantConfig().getBroker());
+      assertNull(tableConfig.getTenantConfig().getTagOverrideConfig());
+
+      tableConfigToCompare = TableConfig.fromZnRecord(tableConfig.toZNRecord());
+      assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+      assertNotNull(tableConfigToCompare.getTenantConfig());
+      assertEquals(tableConfigToCompare.getTenantConfig().getServer(), tableConfig.getTenantConfig().getServer());
+      assertEquals(tableConfigToCompare.getTenantConfig().getBroker(), tableConfig.getTenantConfig().getBroker());
+      assertNull(tableConfig.getTenantConfig().getTagOverrideConfig());
 
       TagOverrideConfig tagOverrideConfig = new TagOverrideConfig();
       tagOverrideConfig.setRealtimeConsuming("aRTConsumingTag_REALTIME");
       tableConfig = tableConfigBuilder.setTagOverrideConfig(tagOverrideConfig).build();
 
-      Assert.assertEquals(tableConfig.getTableName(), "myTable_OFFLINE");
-      Assert.assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
-      Assert.assertNotNull(tableConfig.getTenantConfig());
-      Assert.assertEquals(tableConfig.getTenantConfig().getServer(), "aServerTenant");
-      Assert.assertEquals(tableConfig.getTenantConfig().getBroker(), "aBrokerTenant");
-      Assert.assertNotNull(tableConfig.getTenantConfig().getTagOverrideConfig());
-      Assert.assertEquals(tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming(),
+      assertEquals(tableConfig.getTableName(), "myTable_OFFLINE");
+      assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
+      assertNotNull(tableConfig.getTenantConfig());
+      assertEquals(tableConfig.getTenantConfig().getServer(), "aServerTenant");
+      assertEquals(tableConfig.getTenantConfig().getBroker(), "aBrokerTenant");
+      assertNotNull(tableConfig.getTenantConfig().getTagOverrideConfig());
+      assertEquals(tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeConsuming(),
           "aRTConsumingTag_REALTIME");
-      Assert.assertNull(tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted());
+      assertNull(tableConfig.getTenantConfig().getTagOverrideConfig().getRealtimeCompleted());
 
       // Serialize then de-serialize
-      tableConfigToCompare = TableConfig.fromJSONConfig(TableConfig.toJSONConfig(tableConfig));
-      Assert.assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
-      Assert.assertNotNull(tableConfigToCompare.getTenantConfig());
-      Assert
-          .assertEquals(tableConfigToCompare.getTenantConfig().getServer(), tableConfig.getTenantConfig().getServer());
-      Assert
-          .assertEquals(tableConfigToCompare.getTenantConfig().getBroker(), tableConfig.getTenantConfig().getBroker());
-      Assert.assertNotNull(tableConfigToCompare.getTenantConfig().getTagOverrideConfig());
-      Assert.assertEquals(tableConfig.getTenantConfig().getTagOverrideConfig(),
+      tableConfigToCompare = TableConfig.fromJsonConfig(tableConfig.toJsonConfig());
+      assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+      assertNotNull(tableConfigToCompare.getTenantConfig());
+      assertEquals(tableConfigToCompare.getTenantConfig().getServer(), tableConfig.getTenantConfig().getServer());
+      assertEquals(tableConfigToCompare.getTenantConfig().getBroker(), tableConfig.getTenantConfig().getBroker());
+      assertNotNull(tableConfigToCompare.getTenantConfig().getTagOverrideConfig());
+      assertEquals(tableConfig.getTenantConfig().getTagOverrideConfig(),
           tableConfigToCompare.getTenantConfig().getTagOverrideConfig());
 
-      znRecord = TableConfig.toZnRecord(tableConfig);
-      tableConfigToCompare = TableConfig.fromZnRecord(znRecord);
-      Assert.assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
-      Assert.assertNotNull(tableConfigToCompare.getTenantConfig());
-      Assert
-          .assertEquals(tableConfigToCompare.getTenantConfig().getServer(), tableConfig.getTenantConfig().getServer());
-      Assert
-          .assertEquals(tableConfigToCompare.getTenantConfig().getBroker(), tableConfig.getTenantConfig().getBroker());
-      Assert.assertNotNull(tableConfigToCompare.getTenantConfig().getTagOverrideConfig());
-      Assert.assertEquals(tableConfig.getTenantConfig().getTagOverrideConfig(),
+      tableConfigToCompare = TableConfig.fromZnRecord(tableConfig.toZNRecord());
+      assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+      assertNotNull(tableConfigToCompare.getTenantConfig());
+      assertEquals(tableConfigToCompare.getTenantConfig().getServer(), tableConfig.getTenantConfig().getServer());
+      assertEquals(tableConfigToCompare.getTenantConfig().getBroker(), tableConfig.getTenantConfig().getBroker());
+      assertNotNull(tableConfigToCompare.getTenantConfig().getTagOverrideConfig());
+      assertEquals(tableConfig.getTenantConfig().getTagOverrideConfig(),
           tableConfigToCompare.getTenantConfig().getTagOverrideConfig());
     }
     {
@@ -185,36 +245,32 @@ public class TableConfigTest {
       tableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupConfig);
 
       // Serialize then de-serialize
-      TableConfig tableConfigToCompare = TableConfig.fromJSONConfig(TableConfig.toJSONConfig(tableConfig));
+      TableConfig tableConfigToCompare = TableConfig.fromJsonConfig(tableConfig.toJsonConfig());
       checkTableConfigWithAssignmentConfig(tableConfig, tableConfigToCompare);
 
-      ZNRecord znRecord = TableConfig.toZnRecord(tableConfig);
-      tableConfigToCompare = TableConfig.fromZnRecord(znRecord);
+      tableConfigToCompare = TableConfig.fromZnRecord(tableConfig.toZNRecord());
       checkTableConfigWithAssignmentConfig(tableConfig, tableConfigToCompare);
     }
     {
       // With default StreamConsumptionConfig
       TableConfig tableConfig = tableConfigBuilder.build();
-      Assert.assertEquals(
-          tableConfig.getIndexingConfig().getStreamConsumptionConfig().getStreamPartitionAssignmentStrategy(),
+      assertEquals(tableConfig.getIndexingConfig().getStreamConsumptionConfig().getStreamPartitionAssignmentStrategy(),
           "UniformStreamPartitionAssignment");
 
       // with streamConsumptionConfig set
       tableConfig =
           tableConfigBuilder.setStreamPartitionAssignmentStrategy("BalancedStreamPartitionAssignment").build();
-      Assert.assertEquals(
-          tableConfig.getIndexingConfig().getStreamConsumptionConfig().getStreamPartitionAssignmentStrategy(),
+      assertEquals(tableConfig.getIndexingConfig().getStreamConsumptionConfig().getStreamPartitionAssignmentStrategy(),
           "BalancedStreamPartitionAssignment");
 
       // Serialize then de-serialize
-      TableConfig tableConfigToCompare = TableConfig.fromJSONConfig(TableConfig.toJSONConfig(tableConfig));
-      Assert.assertEquals(
+      TableConfig tableConfigToCompare = TableConfig.fromJsonConfig(tableConfig.toJsonConfig());
+      assertEquals(
           tableConfigToCompare.getIndexingConfig().getStreamConsumptionConfig().getStreamPartitionAssignmentStrategy(),
           "BalancedStreamPartitionAssignment");
 
-      ZNRecord znRecord = TableConfig.toZnRecord(tableConfig);
-      tableConfigToCompare = TableConfig.fromZnRecord(znRecord);
-      Assert.assertEquals(
+      tableConfigToCompare = TableConfig.fromZnRecord(tableConfig.toZNRecord());
+      assertEquals(
           tableConfigToCompare.getIndexingConfig().getStreamConsumptionConfig().getStreamPartitionAssignmentStrategy(),
           "BalancedStreamPartitionAssignment");
     }
@@ -233,11 +289,10 @@ public class TableConfigTest {
       tableConfig.getIndexingConfig().setStarTreeIndexSpec(starTreeIndexSpec);
 
       // Serialize then de-serialize
-      TableConfig tableConfigToCompare = TableConfig.fromJSONConfig(TableConfig.toJSONConfig(tableConfig));
+      TableConfig tableConfigToCompare = TableConfig.fromJsonConfig(tableConfig.toJsonConfig());
       checkTableConfigWithStarTreeConfig(tableConfig, tableConfigToCompare);
 
-      ZNRecord znRecord = TableConfig.toZnRecord(tableConfig);
-      tableConfigToCompare = TableConfig.fromZnRecord(znRecord);
+      tableConfigToCompare = TableConfig.fromZnRecord(tableConfig.toZNRecord());
       checkTableConfigWithStarTreeConfig(tableConfig, tableConfigToCompare);
     }
     {
@@ -253,43 +308,42 @@ public class TableConfigTest {
 
       String hllConfigJson = hllConfig.toJsonString();
       HllConfig newHllConfig = HllConfig.fromJsonString(hllConfigJson);
-      Assert.assertEquals(hllConfig.getColumnsToDeriveHllFields(), newHllConfig.getColumnsToDeriveHllFields());
-      Assert.assertEquals(hllConfig.getHllLog2m(), newHllConfig.getHllLog2m());
-      Assert.assertEquals(hllConfig.getHllDeriveColumnSuffix(), newHllConfig.getHllDeriveColumnSuffix());
+      assertEquals(hllConfig.getColumnsToDeriveHllFields(), newHllConfig.getColumnsToDeriveHllFields());
+      assertEquals(hllConfig.getHllLog2m(), newHllConfig.getHllLog2m());
+      assertEquals(hllConfig.getHllDeriveColumnSuffix(), newHllConfig.getHllDeriveColumnSuffix());
 
       TableConfig tableConfig = tableConfigBuilder.build();
       tableConfig.getValidationConfig().setHllConfig(hllConfig);
 
       // Serialize then de-serialize
-      TableConfig tableConfigToCompare = TableConfig.fromJSONConfig(TableConfig.toJSONConfig(tableConfig));
+      TableConfig tableConfigToCompare = TableConfig.fromJsonConfig(tableConfig.toJsonConfig());
       checkTableConfigWithHllConfig(tableConfig, tableConfigToCompare);
 
-      ZNRecord znRecord = TableConfig.toZnRecord(tableConfig);
-      tableConfigToCompare = TableConfig.fromZnRecord(znRecord);
+      tableConfigToCompare = TableConfig.fromZnRecord(tableConfig.toZNRecord());
       checkTableConfigWithHllConfig(tableConfig, tableConfigToCompare);
     }
   }
 
   private void checkTableConfigWithAssignmentConfig(TableConfig tableConfig, TableConfig tableConfigToCompare) {
     // Check that the segment assignment configuration does exist.
-    Assert.assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
-    Assert.assertNotNull(tableConfigToCompare.getValidationConfig().getReplicaGroupStrategyConfig());
-    Assert.assertEquals(tableConfigToCompare.getValidationConfig().getReplicaGroupStrategyConfig(),
+    assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+    assertNotNull(tableConfigToCompare.getValidationConfig().getReplicaGroupStrategyConfig());
+    assertEquals(tableConfigToCompare.getValidationConfig().getReplicaGroupStrategyConfig(),
         tableConfig.getValidationConfig().getReplicaGroupStrategyConfig());
 
     // Check that the configurations are correct.
     ReplicaGroupStrategyConfig strategyConfig =
         tableConfigToCompare.getValidationConfig().getReplicaGroupStrategyConfig();
-    Assert.assertTrue(strategyConfig.getMirrorAssignmentAcrossReplicaGroups());
-    Assert.assertEquals(strategyConfig.getNumInstancesPerPartition(), 5);
-    Assert.assertEquals(strategyConfig.getPartitionColumn(), "memberId");
+    assertTrue(strategyConfig.getMirrorAssignmentAcrossReplicaGroups());
+    assertEquals(strategyConfig.getNumInstancesPerPartition(), 5);
+    assertEquals(strategyConfig.getPartitionColumn(), "memberId");
   }
 
   private void checkTableConfigWithStarTreeConfig(TableConfig tableConfig, TableConfig tableConfigToCompare)
       throws Exception {
     // Check that the segment assignment configuration does exist.
-    Assert.assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
-    Assert.assertNotNull(tableConfigToCompare.getIndexingConfig().getStarTreeIndexSpec());
+    assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+    assertNotNull(tableConfigToCompare.getIndexingConfig().getStarTreeIndexSpec());
 
     // Check that the configurations are correct.
     StarTreeIndexSpec starTreeIndexSpec = tableConfigToCompare.getIndexingConfig().getStarTreeIndexSpec();
@@ -297,24 +351,24 @@ public class TableConfigTest {
     Set<String> dims = new HashSet<>();
     dims.add("dims");
 
-    Assert.assertEquals(starTreeIndexSpec.getDimensionsSplitOrder(), Collections.singletonList("dim"));
-    Assert.assertEquals(starTreeIndexSpec.getMaxLeafRecords(), 5);
-    Assert.assertEquals(starTreeIndexSpec.getSkipMaterializationCardinalityThreshold(), 1);
-    Assert.assertEquals(starTreeIndexSpec.getSkipMaterializationForDimensions(), dims);
-    Assert.assertEquals(starTreeIndexSpec.getSkipStarNodeCreationForDimensions(), dims);
+    assertEquals(starTreeIndexSpec.getDimensionsSplitOrder(), Collections.singletonList("dim"));
+    assertEquals(starTreeIndexSpec.getMaxLeafRecords(), 5);
+    assertEquals(starTreeIndexSpec.getSkipMaterializationCardinalityThreshold(), 1);
+    assertEquals(starTreeIndexSpec.getSkipMaterializationForDimensions(), dims);
+    assertEquals(starTreeIndexSpec.getSkipStarNodeCreationForDimensions(), dims);
 
     starTreeIndexSpec = StarTreeIndexSpec.fromJsonString(starTreeIndexSpec.toJsonString());
-    Assert.assertEquals(starTreeIndexSpec.getDimensionsSplitOrder(), Collections.singletonList("dim"));
-    Assert.assertEquals(starTreeIndexSpec.getMaxLeafRecords(), 5);
-    Assert.assertEquals(starTreeIndexSpec.getSkipMaterializationCardinalityThreshold(), 1);
-    Assert.assertEquals(starTreeIndexSpec.getSkipMaterializationForDimensions(), dims);
-    Assert.assertEquals(starTreeIndexSpec.getSkipStarNodeCreationForDimensions(), dims);
+    assertEquals(starTreeIndexSpec.getDimensionsSplitOrder(), Collections.singletonList("dim"));
+    assertEquals(starTreeIndexSpec.getMaxLeafRecords(), 5);
+    assertEquals(starTreeIndexSpec.getSkipMaterializationCardinalityThreshold(), 1);
+    assertEquals(starTreeIndexSpec.getSkipMaterializationForDimensions(), dims);
+    assertEquals(starTreeIndexSpec.getSkipStarNodeCreationForDimensions(), dims);
   }
 
   private void checkTableConfigWithHllConfig(TableConfig tableConfig, TableConfig tableConfigToCompare) {
     // Check that the segment assignment configuration does exist.
-    Assert.assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
-    Assert.assertNotNull(tableConfigToCompare.getValidationConfig().getHllConfig());
+    assertEquals(tableConfigToCompare.getTableName(), tableConfig.getTableName());
+    assertNotNull(tableConfigToCompare.getValidationConfig().getHllConfig());
 
     // Check that the configurations are correct.
     HllConfig hllConfig = tableConfigToCompare.getValidationConfig().getHllConfig();
@@ -323,8 +377,8 @@ public class TableConfigTest {
     columns.add("column");
     columns.add("column2");
 
-    Assert.assertEquals(hllConfig.getColumnsToDeriveHllFields(), columns);
-    Assert.assertEquals(hllConfig.getHllLog2m(), 9);
-    Assert.assertEquals(hllConfig.getHllDeriveColumnSuffix(), "suffix");
+    assertEquals(hllConfig.getColumnsToDeriveHllFields(), columns);
+    assertEquals(hllConfig.getHllLog2m(), 9);
+    assertEquals(hllConfig.getHllDeriveColumnSuffix(), "suffix");
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
index 2afa2bf..01d5fdf 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableConfigRestletResource.java
@@ -94,93 +94,101 @@ public class PinotTableConfigRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/v2/tables")
   public Response createNewTable(String tableConfiguration) {
-    CombinedConfig config = null;
-
     try {
-      config = Deserializer.deserializeFromString(CombinedConfig.class, tableConfiguration);
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception while deserializing the table configuration", e);
-      return Response.serverError().entity(e.getMessage()).type(MediaType.TEXT_PLAIN_TYPE).build();
-    }
+      CombinedConfig config;
 
-    if (config == null) {
-      LOGGER.warn("Failed to deserialize the table configuration: {}", tableConfiguration);
-      return Response.serverError().entity("Failed to deserialize the table configuration")
-          .type(MediaType.TEXT_PLAIN_TYPE).build();
-    }
+      try {
+        config = Deserializer.deserializeFromString(CombinedConfig.class, tableConfiguration);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while deserializing the table configuration", e);
+        return Response.serverError().entity(e.getMessage()).type(MediaType.TEXT_PLAIN_TYPE).build();
+      }
 
-    if (config.getSchema() != null) {
-      _resourceManager.addOrUpdateSchema(config.getSchema());
-    }
+      if (config == null) {
+        LOGGER.warn("Failed to deserialize the table configuration: {}", tableConfiguration);
+        return Response.serverError().entity("Failed to deserialize the table configuration")
+            .type(MediaType.TEXT_PLAIN_TYPE).build();
+      }
 
-    if (config.getOfflineTableConfig() != null) {
-      _resourceManager.addTable(config.getOfflineTableConfig());
-    }
+      if (config.getSchema() != null) {
+        _resourceManager.addOrUpdateSchema(config.getSchema());
+      }
 
-    if (config.getRealtimeTableConfig() != null) {
-      _resourceManager.addTable(config.getRealtimeTableConfig());
-    }
+      if (config.getOfflineTableConfig() != null) {
+        _resourceManager.addTable(config.getOfflineTableConfig());
+      }
 
-    return Response.ok().build();
+      if (config.getRealtimeTableConfig() != null) {
+        _resourceManager.addTable(config.getRealtimeTableConfig());
+      }
+
+      return Response.ok().build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
   }
 
   @PUT
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/v2/tables/{tableName}")
   public Response updateTable(String tableConfiguration) {
-    CombinedConfig config = null;
-
     try {
-      config = Deserializer.deserializeFromString(CombinedConfig.class, tableConfiguration);
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception while deserializing the table configuration", e);
-      return Response.serverError().entity(e.getMessage()).type(MediaType.TEXT_PLAIN_TYPE).build();
-    }
+      CombinedConfig config;
 
-    if (config == null) {
-      LOGGER.warn("Failed to deserialize the table configuration: {}", tableConfiguration);
-      return Response.serverError().entity("Failed to deserialize the table configuration")
-          .type(MediaType.TEXT_PLAIN_TYPE).build();
-    }
+      try {
+        config = Deserializer.deserializeFromString(CombinedConfig.class, tableConfiguration);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while deserializing the table configuration", e);
+        return Response.serverError().entity(e.getMessage()).type(MediaType.TEXT_PLAIN_TYPE).build();
+      }
 
-    if (config.getSchema() != null) {
-      _resourceManager.addOrUpdateSchema(config.getSchema());
-    }
+      if (config == null) {
+        LOGGER.warn("Failed to deserialize the table configuration: {}", tableConfiguration);
+        return Response.serverError().entity("Failed to deserialize the table configuration")
+            .type(MediaType.TEXT_PLAIN_TYPE).build();
+      }
 
-    if (config.getOfflineTableConfig() != null) {
-      if (_resourceManager.getAllTables().contains(config.getOfflineTableConfig().getTableName())) {
-        try {
-          _resourceManager
-              .setExistingTableConfig(config.getOfflineTableConfig(), config.getOfflineTableConfig().getTableName(),
-                  CommonConstants.Helix.TableType.OFFLINE);
-        } catch (IOException e) {
-          LOGGER.warn("Failed to update the offline table configuration for table {}", e,
-              config.getOfflineTableConfig().getTableName());
-          return Response.serverError().entity("Failed to update the offline table configuration")
-              .type(MediaType.TEXT_PLAIN_TYPE).build();
+      if (config.getSchema() != null) {
+        _resourceManager.addOrUpdateSchema(config.getSchema());
+      }
+
+      if (config.getOfflineTableConfig() != null) {
+        if (_resourceManager.getAllTables().contains(config.getOfflineTableConfig().getTableName())) {
+          try {
+            _resourceManager
+                .setExistingTableConfig(config.getOfflineTableConfig(), config.getOfflineTableConfig().getTableName(),
+                    CommonConstants.Helix.TableType.OFFLINE);
+          } catch (IOException e) {
+            LOGGER.warn("Failed to update the offline table configuration for table {}", e,
+                config.getOfflineTableConfig().getTableName());
+            return Response.serverError().entity("Failed to update the offline table configuration")
+                .type(MediaType.TEXT_PLAIN_TYPE).build();
+          }
+        } else {
+          _resourceManager.addTable(config.getOfflineTableConfig());
         }
-      } else {
-        _resourceManager.addTable(config.getOfflineTableConfig());
       }
-    }
 
-    if (config.getRealtimeTableConfig() != null) {
-      if (_resourceManager.getAllTables().contains(config.getRealtimeTableConfig().getTableName())) {
-        try {
-          _resourceManager
-              .setExistingTableConfig(config.getRealtimeTableConfig(), config.getRealtimeTableConfig().getTableName(),
-                  CommonConstants.Helix.TableType.REALTIME);
-        } catch (IOException e) {
-          LOGGER.warn("Failed to update the realtime table configuration for table {}", e,
-              config.getRealtimeTableConfig().getTableName());
-          return Response.serverError().entity("Failed to update the realtime table configuration")
-              .type(MediaType.TEXT_PLAIN_TYPE).build();
+      if (config.getRealtimeTableConfig() != null) {
+        if (_resourceManager.getAllTables().contains(config.getRealtimeTableConfig().getTableName())) {
+          try {
+            _resourceManager
+                .setExistingTableConfig(config.getRealtimeTableConfig(), config.getRealtimeTableConfig().getTableName(),
+                    CommonConstants.Helix.TableType.REALTIME);
+          } catch (IOException e) {
+            LOGGER.warn("Failed to update the realtime table configuration for table {}", e,
+                config.getRealtimeTableConfig().getTableName());
+            return Response.serverError().entity("Failed to update the realtime table configuration")
+                .type(MediaType.TEXT_PLAIN_TYPE).build();
+          }
+        } else {
+          _resourceManager.addTable(config.getRealtimeTableConfig());
         }
-      } else {
-        _resourceManager.addTable(config.getRealtimeTableConfig());
       }
-    }
 
-    return Response.ok().build();
+      return Response.ok().build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 739d958..a3b45e5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -130,7 +130,7 @@ public class PinotTableRestletResource {
       } else if (e instanceof PinotHelixResourceManager.TableAlreadyExistsException) {
         throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.CONFLICT, e);
       } else {
-        throw e;
+        throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
       }
     }
   }
@@ -172,14 +172,14 @@ public class PinotTableRestletResource {
           && _pinotHelixResourceManager.hasOfflineTable(tableName)) {
         TableConfig tableConfig = _pinotHelixResourceManager.getOfflineTableConfig(tableName);
         Preconditions.checkNotNull(tableConfig);
-        ret.set(CommonConstants.Helix.TableType.OFFLINE.name(), TableConfig.toJSONConfig(tableConfig));
+        ret.set(CommonConstants.Helix.TableType.OFFLINE.name(), tableConfig.toJsonConfig());
       }
 
       if ((tableTypeStr == null || CommonConstants.Helix.TableType.REALTIME.name().equalsIgnoreCase(tableTypeStr))
           && _pinotHelixResourceManager.hasRealtimeTable(tableName)) {
         TableConfig tableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableName);
         Preconditions.checkNotNull(tableConfig);
-        ret.set(CommonConstants.Helix.TableType.REALTIME.name(), TableConfig.toJSONConfig(tableConfig));
+        ret.set(CommonConstants.Helix.TableType.REALTIME.name(), tableConfig.toJsonConfig());
       }
       return ret.toString();
     } catch (Exception e) {
@@ -325,11 +325,9 @@ public class PinotTableRestletResource {
       ObjectNode tableConfigValidateStr = JsonUtils.newObjectNode();
       TableConfig tableConfig = TableConfig.fromJsonString(tableConfigStr);
       if (tableConfig.getTableType() == CommonConstants.Helix.TableType.OFFLINE) {
-        tableConfigValidateStr
-            .set(CommonConstants.Helix.TableType.OFFLINE.name(), TableConfig.toJSONConfig(tableConfig));
+        tableConfigValidateStr.set(CommonConstants.Helix.TableType.OFFLINE.name(), tableConfig.toJsonConfig());
       } else {
-        tableConfigValidateStr
-            .set(CommonConstants.Helix.TableType.REALTIME.name(), TableConfig.toJSONConfig(tableConfig));
+        tableConfigValidateStr.set(CommonConstants.Helix.TableType.REALTIME.name(), tableConfig.toJsonConfig());
       }
       return tableConfigValidateStr.toString();
     } catch (Exception e) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 73d7428..5e15bff 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1009,7 +1009,8 @@ public class PinotHelixResourceManager {
    * @throws InvalidTableConfigException
    * @throws TableAlreadyExistsException for offline tables only if the table already exists
    */
-  public void addTable(@Nonnull TableConfig tableConfig) {
+  public void addTable(@Nonnull TableConfig tableConfig)
+      throws IOException {
     final String tableNameWithType = tableConfig.getTableName();
 
     TenantConfig tenantConfig;
@@ -1086,8 +1087,7 @@ public class PinotHelixResourceManager {
         LOGGER.info("successfully added the table : " + tableNameWithType + " to the cluster");
 
         // lets add table configs
-        ZKMetadataProvider
-            .setOfflineTableConfig(_propertyStore, tableNameWithType, TableConfig.toZnRecord(tableConfig));
+        ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord());
 
         _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType),
             new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
@@ -1109,8 +1109,7 @@ public class PinotHelixResourceManager {
         }
 
         // lets add table configs
-        ZKMetadataProvider
-            .setRealtimeTableConfig(_propertyStore, tableNameWithType, TableConfig.toZnRecord(tableConfig));
+        ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, tableConfig.toZNRecord());
 
         /*
          * PinotRealtimeSegmentManager sets up watches on table and segment path. When a table gets created,
@@ -1248,13 +1247,13 @@ public class PinotHelixResourceManager {
   public void setExistingTableConfig(TableConfig config, String tableNameWithType, TableType type)
       throws IOException {
     if (type == TableType.REALTIME) {
-      ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, TableConfig.toZnRecord(config));
+      ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, config.toZNRecord());
       ensureRealtimeClusterIsSetUp(config, tableNameWithType, config.getIndexingConfig());
     } else if (type == TableType.OFFLINE) {
       // Update replica group partition assignment to the property store if applicable
       updateReplicaGroupPartitionAssignment(config);
 
-      ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, TableConfig.toZnRecord(config));
+      ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, config.toZNRecord());
       IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
       final String configReplication = config.getValidationConfig().getReplication();
       if (configReplication != null && !config.getValidationConfig().getReplication()
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/AutoAddInvertedIndex.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/AutoAddInvertedIndex.java
index c633091..26a5e46 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/AutoAddInvertedIndex.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/AutoAddInvertedIndex.java
@@ -339,7 +339,7 @@ public class AutoAddInvertedIndex {
     httpURLConnection.setRequestMethod("PUT");
 
     BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(httpURLConnection.getOutputStream(), "UTF-8"));
-    writer.write(tableConfig.toJSONConfigString());
+    writer.write(tableConfig.toJsonConfigString());
     writer.flush();
 
     BufferedReader reader = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream(), "UTF-8"));
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
index 605c1f5..5dbe230 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
@@ -109,7 +109,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
     TableConfig offlineTableConfig = _offlineBuilder.build();
     offlineTableConfig.setTableName("bad__table__name");
     try {
-      sendPostRequest(_createTableUrl, offlineTableConfig.toJSONConfigString());
+      sendPostRequest(_createTableUrl, offlineTableConfig.toJsonConfigString());
       Assert.fail("Creation of an OFFLINE table with two underscores in the table name does not fail");
     } catch (IOException e) {
       // Expected 400 Bad Request
@@ -118,7 +118,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
 
     // Create an OFFLINE table with a valid name which should succeed
     offlineTableConfig.setTableName("valid_table_name");
-    String offlineTableJSONConfigString = offlineTableConfig.toJSONConfigString();
+    String offlineTableJSONConfigString = offlineTableConfig.toJsonConfigString();
     sendPostRequest(_createTableUrl, offlineTableJSONConfigString);
 
     // Create an OFFLINE table that already exists which should fail
@@ -134,7 +134,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
     offlineTableConfig.getValidationConfig().setReplication("abc");
     offlineTableConfig.setTableName("invalid_replication_table");
     try {
-      sendPostRequest(_createTableUrl, offlineTableConfig.toJSONConfigString());
+      sendPostRequest(_createTableUrl, offlineTableConfig.toJsonConfigString());
       Assert.fail("Creation of an invalid OFFLINE table does not fail");
     } catch (IOException e) {
       // Expected 400 Bad Request
@@ -146,7 +146,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
     TableConfig realtimeTableConfig = _realtimeBuilder.build();
     realtimeTableConfig.setTableName("bad__table__name");
     try {
-      sendPostRequest(_createTableUrl, realtimeTableConfig.toJSONConfigString());
+      sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonConfigString());
       Assert.fail("Creation of a REALTIME table with two underscores in the table name does not fail");
     } catch (IOException e) {
       // Expected 400 Bad Request
@@ -157,7 +157,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
     _realtimeBuilder.setSchemaName("invalidSchemaName");
     TableConfig invalidConfig = _realtimeBuilder.build();
     try {
-      sendPostRequest(_createTableUrl, realtimeTableConfig.toJSONConfigString());
+      sendPostRequest(_createTableUrl, realtimeTableConfig.toJsonConfigString());
       Assert.fail("Creation of a REALTIME table without a valid schema does not fail");
     } catch (IOException e) {
       // Expected 400 Bad Request
@@ -171,13 +171,13 @@ public class PinotTableRestletResourceTest extends ControllerTest {
     _realtimeBuilder.setTableName("RT_TABLE");
     addDummySchema(schemaName);
     TableConfig diffConfig = _realtimeBuilder.build();
-    sendPostRequest(_createTableUrl, diffConfig.toJSONConfigString());
+    sendPostRequest(_createTableUrl, diffConfig.toJsonConfigString());
 
     // Create a REALTIME table with a valid name and schema which should succeed
     _realtimeBuilder.setTableName(REALTIME_TABLE_NAME);
     _realtimeBuilder.setSchemaName(REALTIME_TABLE_NAME);
     TableConfig config = _realtimeBuilder.build();
-    String realtimeTableJSONConfigString = config.toJSONConfigString();
+    String realtimeTableJSONConfigString = config.toJsonConfigString();
     sendPostRequest(_createTableUrl, realtimeTableJSONConfigString);
 
     // TODO: check whether we should allow POST request to create REALTIME table that already exists
@@ -195,7 +195,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
   private void testTableMinReplicationInternal(String tableName, int tableReplication)
       throws Exception {
     String tableJSONConfigString =
-        _offlineBuilder.setTableName(tableName).setNumReplicas(tableReplication).build().toJSONConfigString();
+        _offlineBuilder.setTableName(tableName).setNumReplicas(tableReplication).build().toJsonConfigString();
     sendPostRequest(_createTableUrl, tableJSONConfigString);
     // table creation should succeed
     TableConfig tableConfig = getTableConfig(tableName, "OFFLINE");
@@ -204,7 +204,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
 
     addDummySchema(tableName);
     tableJSONConfigString =
-        _realtimeBuilder.setTableName(tableName).setNumReplicas(tableReplication).build().toJSONConfigString();
+        _realtimeBuilder.setTableName(tableName).setNumReplicas(tableReplication).build().toJsonConfigString();
     sendPostRequest(_createTableUrl, tableJSONConfigString);
     tableConfig = getTableConfig(tableName, "REALTIME");
     Assert.assertEquals(tableConfig.getValidationConfig().getReplicationNumber(),
@@ -217,7 +217,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
   private TableConfig getTableConfig(String tableName, String tableType)
       throws Exception {
     String tableConfigString = sendGetRequest(_controllerRequestURLBuilder.forTableGet(tableName));
-    return TableConfig.fromJSONConfig(JsonUtils.stringToJsonNode(tableConfigString).get(tableType));
+    return TableConfig.fromJsonConfig(JsonUtils.stringToJsonNode(tableConfigString).get(tableType));
   }
 
   @Test
@@ -225,7 +225,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
       throws Exception {
     String tableName = "updateTC";
     String tableJSONConfigString =
-        _offlineBuilder.setTableName(tableName).setNumReplicas(2).build().toJSONConfigString();
+        _offlineBuilder.setTableName(tableName).setNumReplicas(2).build().toJsonConfigString();
     sendPostRequest(_createTableUrl, tableJSONConfigString);
     // table creation should succeed
     TableConfig tableConfig = getTableConfig(tableName, "OFFLINE");
@@ -236,7 +236,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
     tableConfig.getValidationConfig().setRetentionTimeValue("10");
 
     JsonNode jsonResponse = JsonUtils.stringToJsonNode(
-        sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJSONConfigString()));
+        sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJsonConfigString()));
     Assert.assertTrue(jsonResponse.has("status"));
 
     TableConfig modifiedConfig = getTableConfig(tableName, "OFFLINE");
@@ -245,7 +245,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
 
     // Realtime
     addDummySchema(tableName);
-    tableJSONConfigString = _realtimeBuilder.setTableName(tableName).setNumReplicas(2).build().toJSONConfigString();
+    tableJSONConfigString = _realtimeBuilder.setTableName(tableName).setNumReplicas(2).build().toJsonConfigString();
     sendPostRequest(_createTableUrl, tableJSONConfigString);
     tableConfig = getTableConfig(tableName, "REALTIME");
     Assert.assertEquals(tableConfig.getValidationConfig().getRetentionTimeValue(), "5");
@@ -255,7 +255,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
     QuotaConfig quota = new QuotaConfig();
     quota.setStorage("10G");
     tableConfig.setQuotaConfig(quota);
-    sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJSONConfigString());
+    sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJsonConfigString());
     modifiedConfig = getTableConfig(tableName, "REALTIME");
     Assert.assertNotNull(modifiedConfig.getQuotaConfig());
     Assert.assertEquals(modifiedConfig.getQuotaConfig().getStorage(), "10G");
@@ -263,7 +263,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
 
     quota.setMaxQueriesPerSecond("100.00");
     tableConfig.setQuotaConfig(quota);
-    sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJSONConfigString());
+    sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJsonConfigString());
     modifiedConfig = getTableConfig(tableName, "REALTIME");
     Assert.assertNotNull(modifiedConfig.getQuotaConfig().getMaxQueriesPerSecond());
     Assert.assertEquals(modifiedConfig.getQuotaConfig().getMaxQueriesPerSecond(), "100.00");
@@ -273,7 +273,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
       // table does not exist
       tableConfig.setTableName("noSuchTable_REALTIME");
       sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig("noSuchTable"),
-          tableConfig.toJSONConfigString());
+          tableConfig.toJsonConfigString());
     } catch (Exception e) {
       Assert.assertTrue(e instanceof FileNotFoundException);
       notFoundException = true;
@@ -304,7 +304,7 @@ public class PinotTableRestletResourceTest extends ControllerTest {
     // create the table
     try {
       TableConfig offlineTableConfig = _offlineBuilder.build();
-      sendPostRequest(_createTableUrl, offlineTableConfig.toJSONConfigString());
+      sendPostRequest(_createTableUrl, offlineTableConfig.toJsonConfigString());
     } catch (Exception e) {
       Assert.fail("Failed to create offline table " + tableName + "Error: " + e.getMessage());
     }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResourceTest.java
index 50260e1..645b195 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResourceTest.java
@@ -93,7 +93,7 @@ public class PinotTenantRestletResourceTest extends ControllerTest {
 
     TableConfig offlineTableConfig = _offlineBuilder.build();
     offlineTableConfig.setTableName("mytable_OFFLINE");
-    String offlineTableJSONConfigString = offlineTableConfig.toJSONConfigString();
+    String offlineTableJSONConfigString = offlineTableConfig.toJsonConfigString();
     sendPostRequest(createTableUrl, offlineTableJSONConfigString);
 
     // Try to make sure both kinds of tags work
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
index a0987f2..25855a8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
@@ -59,7 +59,7 @@ public class ControllerInstanceToggleTest extends ControllerTest {
     // Create an offline table
     String tableJSONConfigString =
         new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-            .setNumReplicas(NUM_INSTANCES).build().toJSONConfigString();
+            .setNumReplicas(NUM_INSTANCES).build().toJsonConfigString();
     sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableJSONConfigString);
     Assert.assertEquals(
         _helixAdmin.getResourceIdealState(_helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
index e8f27e5..1411f14 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
@@ -57,7 +57,7 @@ public class ControllerSentinelTestV2 extends ControllerTest {
     String tableName = "testTable";
     String tableJSONConfigString =
         new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(tableName).setNumReplicas(3)
-            .build().toJSONConfigString();
+            .build().toJsonConfigString();
     sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableJSONConfigString);
     Assert.assertEquals(
         _helixAdmin.getResourceIdealState(_helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
index efc4896..089ea6e 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
@@ -58,7 +58,7 @@ public class DefaultControllerRestApi implements ControllerRestApi {
             .getRetrieveTableConfigHttpURI(pushLocation.getHost(), pushLocation.getPort(), _rawTableName));
         JsonNode offlineJsonTableConfig = JsonUtils.stringToJsonNode(response.getResponse()).get(OFFLINE);
         if (offlineJsonTableConfig != null) {
-          TableConfig offlineTableConfig = TableConfig.fromJSONConfig(offlineJsonTableConfig);
+          TableConfig offlineTableConfig = TableConfig.fromJsonConfig(offlineJsonTableConfig);
           LOGGER.info("Got table config: {}", offlineTableConfig);
           return offlineTableConfig;
         }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
index 6939fd7..010be24 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentCreationJob.java
@@ -157,7 +157,7 @@ public class SegmentCreationJob extends BaseSegmentJob {
     TableConfig tableConfig = getTableConfig();
     if (tableConfig != null) {
       validateTableConfig(tableConfig);
-      jobConf.set(JobConfigConstants.TABLE_CONFIG, tableConfig.toJSONConfigString());
+      jobConf.set(JobConfigConstants.TABLE_CONFIG, tableConfig.toJsonConfigString());
     }
     jobConf.set(JobConfigConstants.SCHEMA, getSchema().toSingleLineJsonString());
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 86b59e0..9dd75c2 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -302,7 +302,7 @@ public abstract class ClusterTest extends ControllerTest {
             invertedIndexColumns, bloomFilterColumns, taskConfig);
 
     if (!isUsingNewConfigFormat()) {
-      sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJSONConfigString());
+      sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonConfigString());
     } else {
       _offlineTableConfig = tableConfig;
     }
@@ -317,7 +317,7 @@ public abstract class ClusterTest extends ControllerTest {
             invertedIndexColumns, bloomFilterColumns, taskConfig);
 
     if (!isUsingNewConfigFormat()) {
-      sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJSONConfigString());
+      sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJsonConfigString());
     } else {
       _offlineTableConfig = tableConfig;
     }
@@ -325,8 +325,7 @@ public abstract class ClusterTest extends ControllerTest {
 
   private static TableConfig getOfflineTableConfig(String tableName, String timeColumnName, String timeType,
       String brokerTenant, String serverTenant, String loadMode, SegmentVersion segmentVersion,
-      List<String> invertedIndexColumns, List<String> bloomFilterColumns, TableTaskConfig taskConfig)
-      throws Exception {
+      List<String> invertedIndexColumns, List<String> bloomFilterColumns, TableTaskConfig taskConfig) {
     return new TableConfig.Builder(Helix.TableType.OFFLINE).setTableName(tableName).setTimeColumnName(timeColumnName)
         .setTimeType(timeType).setNumReplicas(3).setBrokerTenant(brokerTenant).setServerTenant(serverTenant)
         .setLoadMode(loadMode).setSegmentVersion(segmentVersion.toString())
@@ -430,7 +429,7 @@ public abstract class ClusterTest extends ControllerTest {
     _realtimeTableConfig = tableConfig;
 
     if (!isUsingNewConfigFormat()) {
-      sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJSONConfigString());
+      sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonConfigString());
     }
   }
 
@@ -443,7 +442,7 @@ public abstract class ClusterTest extends ControllerTest {
     config.setBloomFilterColumns(bloomFilterCols);
 
     sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tablename),
-        _realtimeTableConfig.toJSONConfigString());
+        _realtimeTableConfig.toJsonConfigString());
   }
 
   protected void dropRealtimeTable(String tableName)
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 3d37abe..172b2fa 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
@@ -33,6 +34,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
@@ -151,6 +153,22 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   }
 
   @Test
+  public void testInvalidTableConfig() {
+    TableConfig tableConfig =
+        new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName("badTable").build();
+    ObjectNode jsonConfig = tableConfig.toJsonConfig();
+    // Remove a mandatory field
+    jsonConfig.remove(TableConfig.VALIDATION_CONFIG_KEY);
+    try {
+      sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), jsonConfig.toString());
+      fail();
+    } catch (IOException e) {
+      // Should get response code 400 (BAD_REQUEST)
+      assertTrue(e.getMessage().startsWith("Server returned HTTP response code: 400"));
+    }
+  }
+
+  @Test
   public void testInvertedIndexTriggering()
       throws Exception {
     final long numTotalDocs = getCountStarResult();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java
index 8c46a5b..18dedc4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java
@@ -206,7 +206,7 @@ public class ClusterStarter {
     String tableJSONConfigString =
         new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(_tableName)
             .setTimeColumnName(_timeColumnName).setTimeType(_timeUnit).setNumReplicas(3).setBrokerTenant("broker")
-            .setServerTenant("server").build().toJSONConfigString();
+            .setServerTenant("server").build().toJsonConfigString();
     sendPostRequest(ControllerRequestURLBuilder.baseUrl(controllerAddress).forTableCreate(), tableJSONConfigString);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org