You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/04/27 22:35:20 UTC

[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8355: add a experiment API for upsert heap memory estimation

Jackie-Jiang commented on code in PR #8355:
URL: https://github.com/apache/pinot/pull/8355#discussion_r860286869


##########
pinot-controller/src/test/java/org/apache/pinot/controller/ControllerTestUtils.java:
##########
@@ -471,6 +472,12 @@ public static Schema createDummySchema(String tableName) {
     return schema;
   }
 
+  public static Schema createDummySchemaForUpsertTable(String tableName) {
+    Schema schema = createDummySchema(tableName);
+    schema.setPrimaryKeyColumns(Lists.newArrayList("dimA"));

Review Comment:
   (minor)
   ```suggestion
       schema.setPrimaryKeyColumns(Collections.singletonList("dimA"));
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey.
+   * -comparisonColSize, it uses for calculating BytesPerValue.
+   * -partitionNums(optional), it uses for host assignment calculation.
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(String tableSchemaConfigStr,
+      @ApiParam(value = "cardinality in string format", required = true) @QueryParam("cardinality") String cardinality,
+      @ApiParam(value = "primaryKeySize in string format") @QueryParam("primaryKeySize") String primaryKeySize,
+      @ApiParam(value = "numPartitions in string format") @QueryParam("numPartitions") String numPartitionsStr) {

Review Comment:
   Both of them should be `int`, and we may put a default value `-1`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey.
+   * -comparisonColSize, it uses for calculating BytesPerValue.
+   * -partitionNums(optional), it uses for host assignment calculation.
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(String tableSchemaConfigStr,
+      @ApiParam(value = "cardinality in string format", required = true) @QueryParam("cardinality") String cardinality,
+      @ApiParam(value = "primaryKeySize in string format") @QueryParam("primaryKeySize") String primaryKeySize,
+      @ApiParam(value = "numPartitions in string format") @QueryParam("numPartitions") String numPartitionsStr) {
+    ObjectNode resultData = JsonUtils.newObjectNode();
+    TableAndSchemaConfig tableSchemaConfig;
+
+    try {
+      tableSchemaConfig = JsonUtils.stringToObject(tableSchemaConfigStr, TableAndSchemaConfig.class);
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Invalid TableSchemaConfigs json string: %s", tableSchemaConfigStr),
+          Response.Status.BAD_REQUEST, e);
+    }
+
+    TableConfig tableConfig = tableSchemaConfig.getTableConfig();
+    resultData.put("tableName", tableConfig.getTableName());
+
+    Schema schema = tableSchemaConfig.getSchema();
+
+    // Estimated key space, it contains primary key columns
+    int bytesPerKey = 0;
+    List<String> primaryKeys = schema.getPrimaryKeyColumns();
+
+    if (primaryKeySize != null) {
+      bytesPerKey += Integer.valueOf(primaryKeySize);
+    } else {
+      for (String primaryKey : primaryKeys) {
+        FieldSpec.DataType dt = schema.getFieldSpecFor(primaryKey).getDataType();
+        if (dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST || dt == FieldSpec.DataType.MAP) {
+          String msg = "Not support data types for primary key columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else if (dt == FieldSpec.DataType.STRING) {
+          String msg = "Missing primary key sizes for String columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else {
+          bytesPerKey += dt.size();
+        }
+      }
+      // Java has a 24 bytes array overhead and there's also 8 bytes for the actual array object
+      bytesPerKey += 32;
+    }
+
+    // Estimated value space, it contains <segmentName, DocId, ComparisonValue(timestamp)> and overhead.
+    int bytesPerValue = 64;
+    String comparisonColumn = tableConfig.getUpsertConfig().getComparisonColumn();
+    if (comparisonColumn != null) {
+      FieldSpec.DataType dt = schema.getFieldSpecFor(comparisonColumn).getDataType();
+      if (dt == FieldSpec.DataType.STRING || dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST
+          || dt == FieldSpec.DataType.MAP) {
+        String msg = "Not support data types for the comparison column";
+        throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+      } else {
+        bytesPerValue = 52 + dt.size();

Review Comment:
   Is this correct? When comparison column is long, this does not match the timestamp one of 64 bytes



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey.
+   * -comparisonColSize, it uses for calculating BytesPerValue.
+   * -partitionNums(optional), it uses for host assignment calculation.
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")

Review Comment:
   ```suggestion
     @Path("/estimateHeapUsage")
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {

Review Comment:
   Let's rename it, and we may add more methods to it in the future
   ```suggestion
   public class PinotUpsertRestletResource {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey.
+   * -comparisonColSize, it uses for calculating BytesPerValue.
+   * -partitionNums(optional), it uses for host assignment calculation.
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(String tableSchemaConfigStr,
+      @ApiParam(value = "cardinality in string format", required = true) @QueryParam("cardinality") String cardinality,
+      @ApiParam(value = "primaryKeySize in string format") @QueryParam("primaryKeySize") String primaryKeySize,
+      @ApiParam(value = "numPartitions in string format") @QueryParam("numPartitions") String numPartitionsStr) {
+    ObjectNode resultData = JsonUtils.newObjectNode();
+    TableAndSchemaConfig tableSchemaConfig;
+
+    try {
+      tableSchemaConfig = JsonUtils.stringToObject(tableSchemaConfigStr, TableAndSchemaConfig.class);
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Invalid TableSchemaConfigs json string: %s", tableSchemaConfigStr),
+          Response.Status.BAD_REQUEST, e);
+    }
+
+    TableConfig tableConfig = tableSchemaConfig.getTableConfig();
+    resultData.put("tableName", tableConfig.getTableName());
+
+    Schema schema = tableSchemaConfig.getSchema();
+
+    // Estimated key space, it contains primary key columns
+    int bytesPerKey = 0;
+    List<String> primaryKeys = schema.getPrimaryKeyColumns();
+
+    if (primaryKeySize != null) {
+      bytesPerKey += Integer.valueOf(primaryKeySize);
+    } else {
+      for (String primaryKey : primaryKeys) {
+        FieldSpec.DataType dt = schema.getFieldSpecFor(primaryKey).getDataType();
+        if (dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST || dt == FieldSpec.DataType.MAP) {
+          String msg = "Not support data types for primary key columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else if (dt == FieldSpec.DataType.STRING) {
+          String msg = "Missing primary key sizes for String columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else {
+          bytesPerKey += dt.size();
+        }
+      }
+      // Java has a 24 bytes array overhead and there's also 8 bytes for the actual array object
+      bytesPerKey += 32;
+    }
+
+    // Estimated value space, it contains <segmentName, DocId, ComparisonValue(timestamp)> and overhead.
+    int bytesPerValue = 64;
+    String comparisonColumn = tableConfig.getUpsertConfig().getComparisonColumn();
+    if (comparisonColumn != null) {
+      FieldSpec.DataType dt = schema.getFieldSpecFor(comparisonColumn).getDataType();
+      if (dt == FieldSpec.DataType.STRING || dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST

Review Comment:
   Same here, check if data type `isFixedWidth()`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey.
+   * -comparisonColSize, it uses for calculating BytesPerValue.
+   * -partitionNums(optional), it uses for host assignment calculation.
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(String tableSchemaConfigStr,
+      @ApiParam(value = "cardinality in string format", required = true) @QueryParam("cardinality") String cardinality,
+      @ApiParam(value = "primaryKeySize in string format") @QueryParam("primaryKeySize") String primaryKeySize,
+      @ApiParam(value = "numPartitions in string format") @QueryParam("numPartitions") String numPartitionsStr) {
+    ObjectNode resultData = JsonUtils.newObjectNode();
+    TableAndSchemaConfig tableSchemaConfig;
+
+    try {
+      tableSchemaConfig = JsonUtils.stringToObject(tableSchemaConfigStr, TableAndSchemaConfig.class);
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Invalid TableSchemaConfigs json string: %s", tableSchemaConfigStr),
+          Response.Status.BAD_REQUEST, e);
+    }
+
+    TableConfig tableConfig = tableSchemaConfig.getTableConfig();
+    resultData.put("tableName", tableConfig.getTableName());
+
+    Schema schema = tableSchemaConfig.getSchema();
+
+    // Estimated key space, it contains primary key columns
+    int bytesPerKey = 0;
+    List<String> primaryKeys = schema.getPrimaryKeyColumns();
+
+    if (primaryKeySize != null) {
+      bytesPerKey += Integer.valueOf(primaryKeySize);
+    } else {
+      for (String primaryKey : primaryKeys) {
+        FieldSpec.DataType dt = schema.getFieldSpecFor(primaryKey).getDataType();

Review Comment:
   You may check `isFixedWidth()` and throw exception if it is not fixed size



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey.
+   * -comparisonColSize, it uses for calculating BytesPerValue.
+   * -partitionNums(optional), it uses for host assignment calculation.
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(String tableSchemaConfigStr,
+      @ApiParam(value = "cardinality in string format", required = true) @QueryParam("cardinality") String cardinality,

Review Comment:
   ```suggestion
         @ApiParam(value = "cardinality in string format", required = true) @QueryParam("cardinality") long cardinality,
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey.
+   * -comparisonColSize, it uses for calculating BytesPerValue.
+   * -partitionNums(optional), it uses for host assignment calculation.
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(String tableSchemaConfigStr,
+      @ApiParam(value = "cardinality in string format", required = true) @QueryParam("cardinality") String cardinality,
+      @ApiParam(value = "primaryKeySize in string format") @QueryParam("primaryKeySize") String primaryKeySize,
+      @ApiParam(value = "numPartitions in string format") @QueryParam("numPartitions") String numPartitionsStr) {
+    ObjectNode resultData = JsonUtils.newObjectNode();
+    TableAndSchemaConfig tableSchemaConfig;
+
+    try {
+      tableSchemaConfig = JsonUtils.stringToObject(tableSchemaConfigStr, TableAndSchemaConfig.class);
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Invalid TableSchemaConfigs json string: %s", tableSchemaConfigStr),
+          Response.Status.BAD_REQUEST, e);
+    }
+
+    TableConfig tableConfig = tableSchemaConfig.getTableConfig();
+    resultData.put("tableName", tableConfig.getTableName());
+
+    Schema schema = tableSchemaConfig.getSchema();
+
+    // Estimated key space, it contains primary key columns
+    int bytesPerKey = 0;
+    List<String> primaryKeys = schema.getPrimaryKeyColumns();
+
+    if (primaryKeySize != null) {
+      bytesPerKey += Integer.valueOf(primaryKeySize);
+    } else {
+      for (String primaryKey : primaryKeys) {
+        FieldSpec.DataType dt = schema.getFieldSpecFor(primaryKey).getDataType();
+        if (dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST || dt == FieldSpec.DataType.MAP) {
+          String msg = "Not support data types for primary key columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else if (dt == FieldSpec.DataType.STRING) {
+          String msg = "Missing primary key sizes for String columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else {
+          bytesPerKey += dt.size();
+        }
+      }
+      // Java has a 24 bytes array overhead and there's also 8 bytes for the actual array object
+      bytesPerKey += 32;
+    }
+
+    // Estimated value space, it contains <segmentName, DocId, ComparisonValue(timestamp)> and overhead.
+    int bytesPerValue = 64;
+    String comparisonColumn = tableConfig.getUpsertConfig().getComparisonColumn();
+    if (comparisonColumn != null) {
+      FieldSpec.DataType dt = schema.getFieldSpecFor(comparisonColumn).getDataType();
+      if (dt == FieldSpec.DataType.STRING || dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST
+          || dt == FieldSpec.DataType.MAP) {
+        String msg = "Not support data types for the comparison column";
+        throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+      } else {
+        bytesPerValue = 52 + dt.size();
+      }
+    }
+
+    resultData.put("bytesPerKey", bytesPerKey);
+    resultData.put("bytesPerValue", bytesPerValue);
+
+    long primaryKeyCardinality = Long.valueOf(cardinality);
+    long totalKeySpace = bytesPerKey * primaryKeyCardinality;
+    long totalValueSpace = bytesPerValue * primaryKeyCardinality;
+    long totalSpace = totalKeySpace + totalValueSpace;

Review Comment:
   (optional) Here we only calculate the map content size. Should we also consider the map entry size and the array size within the map?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java:
##########
@@ -46,6 +46,7 @@ private Constants() {
   public static final String ZOOKEEPER = "Zookeeper";
   public static final String APP_CONFIGS = "AppConfigs";
   public static final String PERIODIC_TASK_TAG = "PeriodicTask";
+  public static final String UPSERT_RESOURCE_TAG = "UpsertResource";

Review Comment:
   ```suggestion
     public static final String UPSERT_RESOURCE_TAG = "Upsert";
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java:
##########
@@ -194,7 +194,7 @@ public String forTableRebalance(String tableName, String tableType, boolean dryR
   }
 
   public String forTableReload(String tableName, TableType tableType, boolean forceDownload) {
-    String query = String.format("reload?type=%s&forceDownload=%s", tableType.name(), forceDownload);
+    String query = String.format("reload?forceDownload=%s&type=%s", forceDownload, tableType.name());

Review Comment:
   Please revert the unrelated changes. I think something is not properly merged



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey.
+   * -comparisonColSize, it uses for calculating BytesPerValue.
+   * -partitionNums(optional), it uses for host assignment calculation.
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(String tableSchemaConfigStr,
+      @ApiParam(value = "cardinality in string format", required = true) @QueryParam("cardinality") String cardinality,
+      @ApiParam(value = "primaryKeySize in string format") @QueryParam("primaryKeySize") String primaryKeySize,
+      @ApiParam(value = "numPartitions in string format") @QueryParam("numPartitions") String numPartitionsStr) {
+    ObjectNode resultData = JsonUtils.newObjectNode();
+    TableAndSchemaConfig tableSchemaConfig;
+
+    try {
+      tableSchemaConfig = JsonUtils.stringToObject(tableSchemaConfigStr, TableAndSchemaConfig.class);
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Invalid TableSchemaConfigs json string: %s", tableSchemaConfigStr),
+          Response.Status.BAD_REQUEST, e);
+    }
+
+    TableConfig tableConfig = tableSchemaConfig.getTableConfig();
+    resultData.put("tableName", tableConfig.getTableName());
+
+    Schema schema = tableSchemaConfig.getSchema();
+
+    // Estimated key space, it contains primary key columns
+    int bytesPerKey = 0;
+    List<String> primaryKeys = schema.getPrimaryKeyColumns();
+
+    if (primaryKeySize != null) {
+      bytesPerKey += Integer.valueOf(primaryKeySize);
+    } else {
+      for (String primaryKey : primaryKeys) {
+        FieldSpec.DataType dt = schema.getFieldSpecFor(primaryKey).getDataType();
+        if (dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST || dt == FieldSpec.DataType.MAP) {
+          String msg = "Not support data types for primary key columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else if (dt == FieldSpec.DataType.STRING) {
+          String msg = "Missing primary key sizes for String columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else {
+          bytesPerKey += dt.size();
+        }
+      }
+      // Java has a 24 bytes array overhead and there's also 8 bytes for the actual array object
+      bytesPerKey += 32;
+    }
+
+    // Estimated value space, it contains <segmentName, DocId, ComparisonValue(timestamp)> and overhead.
+    int bytesPerValue = 64;
+    String comparisonColumn = tableConfig.getUpsertConfig().getComparisonColumn();
+    if (comparisonColumn != null) {
+      FieldSpec.DataType dt = schema.getFieldSpecFor(comparisonColumn).getDataType();
+      if (dt == FieldSpec.DataType.STRING || dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST
+          || dt == FieldSpec.DataType.MAP) {
+        String msg = "Not support data types for the comparison column";
+        throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+      } else {
+        bytesPerValue = 52 + dt.size();
+      }
+    }
+
+    resultData.put("bytesPerKey", bytesPerKey);
+    resultData.put("bytesPerValue", bytesPerValue);
+
+    long primaryKeyCardinality = Long.valueOf(cardinality);
+    long totalKeySpace = bytesPerKey * primaryKeyCardinality;
+    long totalValueSpace = bytesPerValue * primaryKeyCardinality;
+    long totalSpace = totalKeySpace + totalValueSpace;
+
+    resultData.put("totalKeySpace(bytes)", totalKeySpace);
+    resultData.put("totalValueSpace(bytes)", totalValueSpace);
+    resultData.put("totalSpace(bytes)", totalSpace);
+
+    // Use Partitions, replicas to calculate memoryPerHost for host assignment.
+    if (numPartitionsStr != null) {
+      int numPartitions = Integer.valueOf(numPartitionsStr);
+      if (numPartitions > 0) {
+        int replicasPerPartition = tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
+        double memoryPerHost = (totalSpace * replicasPerPartition * 1.0) / numPartitions;
+        resultData.put("numPartitions", numPartitions);
+        resultData.put("replicasPerPartition", replicasPerPartition);
+        resultData.put("memoryPerHost", memoryPerHost);

Review Comment:
   `totalSpacePerPartition(bytes)`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey.
+   * -comparisonColSize, it uses for calculating BytesPerValue.
+   * -partitionNums(optional), it uses for host assignment calculation.
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(String tableSchemaConfigStr,
+      @ApiParam(value = "cardinality in string format", required = true) @QueryParam("cardinality") String cardinality,
+      @ApiParam(value = "primaryKeySize in string format") @QueryParam("primaryKeySize") String primaryKeySize,
+      @ApiParam(value = "numPartitions in string format") @QueryParam("numPartitions") String numPartitionsStr) {
+    ObjectNode resultData = JsonUtils.newObjectNode();
+    TableAndSchemaConfig tableSchemaConfig;
+
+    try {
+      tableSchemaConfig = JsonUtils.stringToObject(tableSchemaConfigStr, TableAndSchemaConfig.class);
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Invalid TableSchemaConfigs json string: %s", tableSchemaConfigStr),
+          Response.Status.BAD_REQUEST, e);
+    }
+
+    TableConfig tableConfig = tableSchemaConfig.getTableConfig();
+    resultData.put("tableName", tableConfig.getTableName());
+
+    Schema schema = tableSchemaConfig.getSchema();
+
+    // Estimated key space, it contains primary key columns
+    int bytesPerKey = 0;
+    List<String> primaryKeys = schema.getPrimaryKeyColumns();
+
+    if (primaryKeySize != null) {
+      bytesPerKey += Integer.valueOf(primaryKeySize);
+    } else {
+      for (String primaryKey : primaryKeys) {
+        FieldSpec.DataType dt = schema.getFieldSpecFor(primaryKey).getDataType();
+        if (dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST || dt == FieldSpec.DataType.MAP) {
+          String msg = "Not support data types for primary key columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else if (dt == FieldSpec.DataType.STRING) {
+          String msg = "Missing primary key sizes for String columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else {
+          bytesPerKey += dt.size();
+        }
+      }
+      // Java has a 24 bytes array overhead and there's also 8 bytes for the actual array object
+      bytesPerKey += 32;
+    }
+
+    // Estimated value space, it contains <segmentName, DocId, ComparisonValue(timestamp)> and overhead.
+    int bytesPerValue = 64;
+    String comparisonColumn = tableConfig.getUpsertConfig().getComparisonColumn();
+    if (comparisonColumn != null) {
+      FieldSpec.DataType dt = schema.getFieldSpecFor(comparisonColumn).getDataType();
+      if (dt == FieldSpec.DataType.STRING || dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST
+          || dt == FieldSpec.DataType.MAP) {
+        String msg = "Not support data types for the comparison column";
+        throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+      } else {
+        bytesPerValue = 52 + dt.size();
+      }
+    }
+
+    resultData.put("bytesPerKey", bytesPerKey);
+    resultData.put("bytesPerValue", bytesPerValue);
+
+    long primaryKeyCardinality = Long.valueOf(cardinality);
+    long totalKeySpace = bytesPerKey * primaryKeyCardinality;
+    long totalValueSpace = bytesPerValue * primaryKeyCardinality;
+    long totalSpace = totalKeySpace + totalValueSpace;
+
+    resultData.put("totalKeySpace(bytes)", totalKeySpace);
+    resultData.put("totalValueSpace(bytes)", totalValueSpace);
+    resultData.put("totalSpace(bytes)", totalSpace);
+
+    // Use Partitions, replicas to calculate memoryPerHost for host assignment.
+    if (numPartitionsStr != null) {
+      int numPartitions = Integer.valueOf(numPartitionsStr);
+      if (numPartitions > 0) {
+        int replicasPerPartition = tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
+        double memoryPerHost = (totalSpace * replicasPerPartition * 1.0) / numPartitions;
+        resultData.put("numPartitions", numPartitions);
+        resultData.put("replicasPerPartition", replicasPerPartition);

Review Comment:
   We don't need `replicasPerPartition` because multiple replicas won't be assigned to the same host



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertCapacityEstimationRestletResource.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.io.IOException;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.UPSERT_RESOURCE_TAG)
+@Path("/")
+public class PinotUpsertCapacityEstimationRestletResource {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotUpsertCapacityEstimationRestletResource.class);
+
+  /**
+   * The API to estimate heap usage for a Pinot upsert table.
+   *
+   * Sample usage: provide tableConfig, tableSchema, and ColumnStats payload.
+   *
+   * The tool calculates heap usage by estimating total Key/Value space based on unique key combinations.
+   * It used the following formula
+   * ```
+   * TotalHeapSize = uniqueCombinations * (BytesPerKey + BytesPerValue).
+   * ```
+   * The following params need to be provided:
+   * ```
+   * -schemaFile, it contains primary key information.
+   * -tableConfigFile, it contains upsertConfig, tablePartitionConfig etc.
+   * -columnStats, which stores column information, collected from kafka or staging pinot table.
+   * ```
+   * For columns stats, we need to gather the following stats
+   * ```
+   * -cardinality, a required information unique combination of primary keys.
+   * -primaryKeySize, it uses for calculating BytesPerKey.
+   * -comparisonColSize, it uses for calculating BytesPerValue.
+   * -partitionNums(optional), it uses for host assignment calculation.
+   * ```
+   */
+  @POST
+  @Path("/heapUsage")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Estimate memory usage for an upsert table", notes =
+      "This API returns the estimated heap usage based on primary key column stats."
+          + " This allows us to estimate table size before onboarding.")
+  public String estimateHeapUsage(String tableSchemaConfigStr,
+      @ApiParam(value = "cardinality in string format", required = true) @QueryParam("cardinality") String cardinality,
+      @ApiParam(value = "primaryKeySize in string format") @QueryParam("primaryKeySize") String primaryKeySize,
+      @ApiParam(value = "numPartitions in string format") @QueryParam("numPartitions") String numPartitionsStr) {
+    ObjectNode resultData = JsonUtils.newObjectNode();
+    TableAndSchemaConfig tableSchemaConfig;
+
+    try {
+      tableSchemaConfig = JsonUtils.stringToObject(tableSchemaConfigStr, TableAndSchemaConfig.class);
+    } catch (IOException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Invalid TableSchemaConfigs json string: %s", tableSchemaConfigStr),
+          Response.Status.BAD_REQUEST, e);
+    }
+
+    TableConfig tableConfig = tableSchemaConfig.getTableConfig();
+    resultData.put("tableName", tableConfig.getTableName());
+
+    Schema schema = tableSchemaConfig.getSchema();
+
+    // Estimated key space, it contains primary key columns
+    int bytesPerKey = 0;
+    List<String> primaryKeys = schema.getPrimaryKeyColumns();
+
+    if (primaryKeySize != null) {
+      bytesPerKey += Integer.valueOf(primaryKeySize);
+    } else {
+      for (String primaryKey : primaryKeys) {
+        FieldSpec.DataType dt = schema.getFieldSpecFor(primaryKey).getDataType();
+        if (dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST || dt == FieldSpec.DataType.MAP) {
+          String msg = "Not support data types for primary key columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else if (dt == FieldSpec.DataType.STRING) {
+          String msg = "Missing primary key sizes for String columns";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else {
+          bytesPerKey += dt.size();
+        }
+      }
+      // Java has a 24 bytes array overhead and there's also 8 bytes for the actual array object
+      bytesPerKey += 32;
+    }
+
+    // Estimated value space, it contains <segmentName, DocId, ComparisonValue(timestamp)> and overhead.
+    int bytesPerValue = 64;
+    String comparisonColumn = tableConfig.getUpsertConfig().getComparisonColumn();
+    if (comparisonColumn != null) {
+      FieldSpec.DataType dt = schema.getFieldSpecFor(comparisonColumn).getDataType();
+      if (dt == FieldSpec.DataType.STRING || dt == FieldSpec.DataType.JSON || dt == FieldSpec.DataType.LIST
+          || dt == FieldSpec.DataType.MAP) {
+        String msg = "Not support data types for the comparison column";
+        throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+      } else {
+        bytesPerValue = 52 + dt.size();
+      }
+    }
+
+    resultData.put("bytesPerKey", bytesPerKey);
+    resultData.put("bytesPerValue", bytesPerValue);
+
+    long primaryKeyCardinality = Long.valueOf(cardinality);

Review Comment:
   ```suggestion
       long primaryKeyCardinality = Long.parseLong(cardinality);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org