You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "somandal (via GitHub)" <gi...@apache.org> on 2023/05/03 20:56:49 UTC

[GitHub] [pinot] somandal commented on a diff in pull request #10636: Adding ArgMin/ArgMax Function

somandal commented on code in PR #10636:
URL: https://github.com/apache/pinot/pull/10636#discussion_r1173079326


##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ChildAggregationFunction.java:
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.IntAggregateResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.IntGroupByResultHolder;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+

Review Comment:
   please add javadocs (for this class and all other newly added classes where it makes sense). Call out the use of the dummy result holders and why it is needed.
   
   Is my understanding correct that these child functions are just a placeholder and only really used to map the parent aggregation result to the correct column in the output schema and the correct child agg function?



##########
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java:
##########
@@ -213,6 +215,8 @@ public static ObjectType getObjectType(Object value) {
         return ObjectType.VarianceTuple;
       } else if (value instanceof PinotFourthMoment) {
         return ObjectType.PinotFourthMoment;
+      } else if (value instanceof org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxObject) {

Review Comment:
   nit: don't need to use the fully qualified name here



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxObject.java:
##########
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+
+
+public class ArgMinMaxObject implements ParentAggregationFunctionResultObject {
+
+  // if the object is created but not yet populated, this happens e.g. when a server has no data for
+  // the query and returns a default value
+  public static final int NOT_NULL_OBJECT = 1;
+  public static final int IS_NULL_OBJECT = 0;

Review Comment:
   Just wondering if it might be cleaner to use an enum here and check the enum's value (0 or 1) instead? Feel free to ignore this suggestion
   
   and nit: rename NOT_NULL_OBJECT -> NON_NULL_OBJECT, rename IS_NULL_OBJECT -> NULL_OBJECT



##########
pinot-core/src/test/java/org/apache/pinot/queries/ArgMinMaxTest.java:
##########
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Queries test for histogram queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ArgMinMaxTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "HistogramQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final int NUM_RECORDS = 2000;
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String MV_INT_COLUMN = "mvIntColumn";
+  private static final String MV_BYTES_COLUMN = "mvBytesColumn";
+  private static final String MV_STRING_COLUMN = "mvStringColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String GROUP_BY_INT_COLUMN = "groupByIntColumn";
+  private static final String GROUP_BY_MV_INT_COLUMN = "groupByMVIntColumn";
+  private static final String GROUP_BY_INT_COLUMN2 = "groupByIntColumn2";
+  private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE).addMultiValueDimension(MV_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(MV_BYTES_COLUMN, DataType.BYTES)
+      .addMultiValueDimension(MV_STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(GROUP_BY_MV_INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN2, DataType.INT)
+      .build();
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return " WHERE intColumn >=  500";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    String[] stringSVVals = new String[]{"a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9", "a11", "a22"};
+    int j = 1;
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      GenericRow record = new GenericRow();
+      record.putValue(INT_COLUMN, i);
+      record.putValue(LONG_COLUMN, (long) i - NUM_RECORDS / 2);
+      record.putValue(FLOAT_COLUMN, (float) i * 0.5);
+      record.putValue(DOUBLE_COLUMN, (double) i);
+      record.putValue(MV_INT_COLUMN, Arrays.asList(i, i + 1, i + 2));
+      record.putValue(MV_BYTES_COLUMN, Arrays.asList(String.valueOf(i).getBytes(), String.valueOf(i + 1).getBytes(),
+          String.valueOf(i + 2).getBytes()));
+      record.putValue(MV_STRING_COLUMN, Arrays.asList("a" + i, "a" + i + 1, "a" + i + 2));
+      if (i < 20) {
+        record.putValue(STRING_COLUMN, stringSVVals[i % stringSVVals.length]);
+      } else {
+        record.putValue(STRING_COLUMN, "a33");
+      }
+      record.putValue(GROUP_BY_INT_COLUMN, i % 5);
+      record.putValue(GROUP_BY_MV_INT_COLUMN, Arrays.asList(i % 10, (i + 1) % 10));
+      if (i == j) {
+        j *= 2;
+      }
+      record.putValue(GROUP_BY_INT_COLUMN2, j);
+      records.add(record);
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+
+    QueryRewriterFactory.init(String.join(",", QueryRewriterFactory.DEFAULT_QUERY_REWRITERS_CLASS_NAMES)
+        + ",org.apache.pinot.sql.parsers.rewriter.ArgMinMaxRewriter");
+    ResultRewriterFactory
+        .init("org.apache.pinot.core.query.utils.rewriter.ParentAggregationResultRewriter");
+  }
+
+  @Test
+  public void testAggregationInterSegment() {
+    // Simple inter segment
+    String query = "SELECT arg_max(intColumn, longColumn) FROM testTable";
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+
+    assertEquals(rows.get(0)[0], 999L);
+    assertEquals(rows.get(1)[0], 999L);
+    assertEquals(rows.size(), 2);
+
+    // Inter segment data type test
+    query = "SELECT arg_max(intColumn, longColumn), arg_max(intColumn, floatColumn), "
+        + "arg_max(intColumn, doubleColumn), arg_min(intColumn, mvIntColumn), "
+        + "arg_min(intColumn, mvStringColumn), arg_min(intColumn, intColumn) FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(resultTable.getDataSchema().getColumnName(0), "argmax([intColumn, longColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(1), "argmax([intColumn, floatColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(2), "argmax([intColumn, doubleColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(3), "argmin([intColumn, mvIntColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(4), "argmin([intColumn, mvStringColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(5), "argmin([intColumn, intColumn])");
+
+    assertEquals(rows.size(), 2);
+    assertEquals(rows.get(0)[0], 999L);
+    assertEquals(rows.get(1)[0], 999L);
+    assertEquals(rows.get(0)[1], 999.5F);
+    assertEquals(rows.get(1)[1], 999.5F);
+    assertEquals(rows.get(0)[2], 1999D);
+    assertEquals(rows.get(1)[2], 1999D);
+    assertEquals(rows.get(0)[3], new Integer[]{0, 1, 2});
+    assertEquals(rows.get(1)[3], new Integer[]{0, 1, 2});
+    assertEquals(rows.get(0)[4], new String[]{"a0", "a01", "a02"});
+    assertEquals(rows.get(1)[4], new String[]{"a0", "a01", "a02"});
+    assertEquals(rows.get(0)[5], 0);
+    assertEquals(rows.get(1)[5], 0);
+
+    // Inter segment mix aggregation function with different result length
+    // Inter segment string column comparison test
+    query = "SELECT sum(intColumn), argmin(stringColumn, doubleColumn), argmin(stringColumn, stringColumn), "
+        + "argmin(stringColumn, doubleColumn, doubleColumn) FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(rows.size(), 4);
+
+    assertEquals(rows.get(0)[0], 7996000D);
+    assertEquals(rows.get(0)[1], 8D);
+    assertEquals(rows.get(0)[2], "a11");
+    assertEquals(rows.get(0)[3], 8D);
+
+    assertNull(rows.get(1)[0]);
+    assertEquals(rows.get(1)[1], 18D);
+    assertEquals(rows.get(1)[2], "a11");
+    assertEquals(rows.get(1)[3], 8D);
+
+    assertNull(rows.get(2)[0]);
+    assertEquals(rows.get(2)[1], 8D);
+    assertEquals(rows.get(2)[2], "a11");
+    assertNull(rows.get(2)[3]);
+
+    assertNull(rows.get(3)[0]);
+    assertEquals(rows.get(3)[1], 18D);
+    assertEquals(rows.get(3)[2], "a11");
+    assertNull(rows.get(3)[3]);
+
+    // Inter segment mix aggregation function with CASE statement
+    query = "SELECT argmin(CASE WHEN stringColumn = 'a33' THEN 'b' WHEN stringColumn = 'a22' THEN 'a' ELSE 'c' END"
+        + ", stringColumn), argmin(CASE WHEN stringColumn = 'a33' THEN 'b' WHEN stringColumn = 'a22' THEN 'a' "
+        + "ELSE 'c' END, CASE WHEN stringColumn = 'a33' THEN 'b' WHEN stringColumn = 'a22' THEN 'a' ELSE 'c' END) "
+        + "FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(rows.size(), 4);
+
+    assertEquals(rows.get(0)[0], "a22");
+    assertEquals(rows.get(0)[1], "a");
+    assertEquals(rows.get(1)[0], "a22");
+    assertEquals(rows.get(1)[1], "a");
+
+    //   TODO: The following query results in an exception, fix the support for multi-value bytes
+    //   query = "SELECT arg_min(intColumn, mvBytesColumn) FROM testTable";
+    //
+    //   brokerResponse = getBrokerResponse(query);
+    //   resultTable = brokerResponse.getResultTable();
+    //   rows = resultTable.getRows();
+  }
+
+  @Test
+  public void testEmptyAggregation() {
+    // Inter segment mix aggregation with no documents after filtering
+    String query =
+        "SELECT arg_max(intColumn, longColumn), argmin(CASE WHEN stringColumn = 'a33' THEN 'b' "
+            + "WHEN stringColumn = 'a22' THEN 'a' ELSE 'c' END"
+            + ", stringColumn) FROM testTable where intColumn > 10000";
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertNull(rows.get(0)[0]);
+    assertNull(rows.get(0)[1]);
+    assertEquals(resultTable.getDataSchema().getColumnName(0), "argmax([intColumn, longColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(1),
+        "argmin([case(equals(stringColumn,'a33'),equals(stringColumn,'a22'),'b','a','c'), stringColumn])");
+  }
+
+  @Test
+  public void testGroupByInterSegment() {

Review Comment:
   can you add a few group by tests where the MV column is used inside argmin/argmax as either the projection column or the measuring column?



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with the same measuring expressions
+ * are consolidated and added as a single function with a list of projection expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the projection expression,
+ * used as column key for result column filler.
+ * Latter, the aggregation, result of the consolidated function will be filled into the corresponding
+ * columns of the original ARG_MIN/ARG_MAX. For more syntax details please refer to ParentAggregationFunction,
+ * ChildAggregationFunction and ChildAggregationResultRewriter.
+ */
+public class ArgMinMaxRewriter implements QueryRewriter {
+
+  private static final String ARG_MAX = "argmax";
+  private static final String ARG_MIN = "argmin";
+
+  private static final String ARG_MAX_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + ARG_MAX;
+  private static final String ARG_MIN_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + ARG_MIN;
+
+  @Override
+  public PinotQuery rewrite(PinotQuery pinotQuery) {
+    // This map stores the mapping from the list of measuring expressions to the set of projection expressions
+    HashMap<List<Expression>, Set<Expression>> argMinFunctionMap = new HashMap<>();
+    // This map stores the mapping from the list of measuring expressions to the function ID
+    HashMap<List<Expression>, Integer> argMinFunctionIDMap = new HashMap<>();
+
+    HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap = new HashMap<>();
+    HashMap<List<Expression>, Integer> argMaxFunctionIDMap = new HashMap<>();
+
+    Iterator<Expression> iterator = pinotQuery.getSelectList().iterator();
+    while (iterator.hasNext()) {
+      boolean added = extractAndRewriteArgMinMaxFunctions(iterator.next(), argMaxFunctionMap, argMaxFunctionIDMap,
+          argMinFunctionMap, argMinFunctionIDMap);
+      // Remove the original function if it is not added, meaning it is a duplicate
+      if (!added) {
+        iterator.remove();
+      }
+    }
+
+    appendParentArgMinMaxFunctions(false, pinotQuery.getSelectList(), argMinFunctionMap, argMinFunctionIDMap);
+    appendParentArgMinMaxFunctions(true, pinotQuery.getSelectList(), argMaxFunctionMap, argMaxFunctionIDMap);
+
+    return pinotQuery;
+  }
+
+  /**
+   * This method appends the consolidated ARG_MIN/ARG_MAX functions to the end of the selection list.
+   * The consolidated function call will be in the following format:
+   * ARG_MAX(functionID, numMeasuringColumns, measuringColumn1, measuringColumn2, ...,
+   *  projectionColumn1, projectionColumn2, ...)
+   *  where functionID is the ID of the consolidated function, numMeasuringColumns is the number of measuring
+   *  columns, measuringColumn1, measuringColumn2, ... are the measuring columns, and projectionColumn1,
+   *  projectionColumn2, ... are the projection columns.
+   *  The number of projection columns is the same as the number of ARG_MIN/ARG_MAX functions with the same
+   *  measuring columns.
+   */
+  private void appendParentArgMinMaxFunctions(boolean isMax, List<Expression> selectList,
+      HashMap<List<Expression>, Set<Expression>> argMinMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMinMaxFunctionIDMap) {
+    for (Map.Entry<List<Expression>, Set<Expression>> entry : argMinMaxFunctionMap.entrySet()) {
+      Literal functionID = new Literal();
+      functionID.setLongValue(argMinMaxFunctionIDMap.get(entry.getKey()));
+      Literal numMeasuringColumns = new Literal();
+      numMeasuringColumns.setLongValue(entry.getKey().size());
+
+      Function parentFunction = new Function(isMax ? ARG_MAX_PARENT : ARG_MIN_PARENT);
+      parentFunction.addToOperands(new Expression(ExpressionType.LITERAL).setLiteral(functionID));
+      parentFunction.addToOperands(new Expression(ExpressionType.LITERAL).setLiteral(numMeasuringColumns));
+      for (Expression expression : entry.getKey()) {
+        parentFunction.addToOperands(expression);
+      }
+      for (Expression expression : entry.getValue()) {
+        parentFunction.addToOperands(expression);
+      }
+      selectList.add(new Expression(ExpressionType.FUNCTION).setFunctionCall(parentFunction));
+    }
+  }
+
+  /**
+   * This method extracts the ARG_MIN/ARG_MAX functions from the given expression and rewrites the functions
+   * with the same measuring expressions to use the same function ID.
+   * @return true if the function is not duplicated, false otherwise.
+   */
+  private boolean extractAndRewriteArgMinMaxFunctions(Expression expression,
+      HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMaxFunctionIDMap,
+      HashMap<List<Expression>, Set<Expression>> argMinFunctionMap,
+      HashMap<List<Expression>, Integer> argMinFunctionIDMap) {
+    Function function = expression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    String functionName = function.getOperator();
+    if (!(functionName.equals("argmin") || functionName.equals("argmax"))) {
+      return true;
+    }
+    List<Expression> operands = function.getOperands();
+    List<Expression> argMinMaxMeasuringExpressions = new ArrayList<>();
+    for (int i = 0; i < operands.size() - 1; i++) {
+      argMinMaxMeasuringExpressions.add(operands.get(i));
+    }
+    Expression argMinMaxProjectionExpression = operands.get(operands.size() - 1);
+
+    if (functionName.equals("argmin")) {
+      return updateArgMinMaxFunctionMap(argMinMaxMeasuringExpressions, argMinMaxProjectionExpression, argMinFunctionMap,
+          argMinFunctionIDMap, function);
+    } else {
+      return updateArgMinMaxFunctionMap(argMinMaxMeasuringExpressions, argMinMaxProjectionExpression, argMaxFunctionMap,
+          argMaxFunctionIDMap, function);
+    }
+  }
+
+  /**
+   * This method rewrites the ARG_MIN/ARG_MAX function with the given measuring expressions to use the same
+   * function ID.
+   * @return true if the function is not duplicated, false otherwise.
+   */
+  boolean updateArgMinMaxFunctionMap(List<Expression> argMinMaxMeasuringExpressions,

Review Comment:
   nit: can this be private?
   
   also shouldn't the parameter `HashMap<List<Expression>, Integer> argMaxFunctionIDMap` be called `HashMap<List<Expression>, Integer> argMinMaxFunctionIDMap`



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with the same measuring expressions
+ * are consolidated and added as a single function with a list of projection expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the projection expression,
+ * used as column key for result column filler.
+ * Latter, the aggregation, result of the consolidated function will be filled into the corresponding
+ * columns of the original ARG_MIN/ARG_MAX. For more syntax details please refer to ParentAggregationFunction,
+ * ChildAggregationFunction and ChildAggregationResultRewriter.
+ */
+public class ArgMinMaxRewriter implements QueryRewriter {
+
+  private static final String ARG_MAX = "argmax";
+  private static final String ARG_MIN = "argmin";
+
+  private static final String ARG_MAX_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + ARG_MAX;
+  private static final String ARG_MIN_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + ARG_MIN;
+
+  @Override
+  public PinotQuery rewrite(PinotQuery pinotQuery) {
+    // This map stores the mapping from the list of measuring expressions to the set of projection expressions
+    HashMap<List<Expression>, Set<Expression>> argMinFunctionMap = new HashMap<>();
+    // This map stores the mapping from the list of measuring expressions to the function ID
+    HashMap<List<Expression>, Integer> argMinFunctionIDMap = new HashMap<>();
+
+    HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap = new HashMap<>();
+    HashMap<List<Expression>, Integer> argMaxFunctionIDMap = new HashMap<>();
+
+    Iterator<Expression> iterator = pinotQuery.getSelectList().iterator();
+    while (iterator.hasNext()) {
+      boolean added = extractAndRewriteArgMinMaxFunctions(iterator.next(), argMaxFunctionMap, argMaxFunctionIDMap,
+          argMinFunctionMap, argMinFunctionIDMap);
+      // Remove the original function if it is not added, meaning it is a duplicate
+      if (!added) {
+        iterator.remove();
+      }
+    }
+
+    appendParentArgMinMaxFunctions(false, pinotQuery.getSelectList(), argMinFunctionMap, argMinFunctionIDMap);
+    appendParentArgMinMaxFunctions(true, pinotQuery.getSelectList(), argMaxFunctionMap, argMaxFunctionIDMap);
+
+    return pinotQuery;
+  }
+
+  /**
+   * This method appends the consolidated ARG_MIN/ARG_MAX functions to the end of the selection list.
+   * The consolidated function call will be in the following format:
+   * ARG_MAX(functionID, numMeasuringColumns, measuringColumn1, measuringColumn2, ...,
+   *  projectionColumn1, projectionColumn2, ...)
+   *  where functionID is the ID of the consolidated function, numMeasuringColumns is the number of measuring
+   *  columns, measuringColumn1, measuringColumn2, ... are the measuring columns, and projectionColumn1,
+   *  projectionColumn2, ... are the projection columns.
+   *  The number of projection columns is the same as the number of ARG_MIN/ARG_MAX functions with the same
+   *  measuring columns.
+   */
+  private void appendParentArgMinMaxFunctions(boolean isMax, List<Expression> selectList,
+      HashMap<List<Expression>, Set<Expression>> argMinMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMinMaxFunctionIDMap) {
+    for (Map.Entry<List<Expression>, Set<Expression>> entry : argMinMaxFunctionMap.entrySet()) {
+      Literal functionID = new Literal();
+      functionID.setLongValue(argMinMaxFunctionIDMap.get(entry.getKey()));
+      Literal numMeasuringColumns = new Literal();
+      numMeasuringColumns.setLongValue(entry.getKey().size());
+
+      Function parentFunction = new Function(isMax ? ARG_MAX_PARENT : ARG_MIN_PARENT);
+      parentFunction.addToOperands(new Expression(ExpressionType.LITERAL).setLiteral(functionID));
+      parentFunction.addToOperands(new Expression(ExpressionType.LITERAL).setLiteral(numMeasuringColumns));
+      for (Expression expression : entry.getKey()) {
+        parentFunction.addToOperands(expression);
+      }
+      for (Expression expression : entry.getValue()) {
+        parentFunction.addToOperands(expression);
+      }
+      selectList.add(new Expression(ExpressionType.FUNCTION).setFunctionCall(parentFunction));
+    }
+  }
+
+  /**
+   * This method extracts the ARG_MIN/ARG_MAX functions from the given expression and rewrites the functions
+   * with the same measuring expressions to use the same function ID.
+   * @return true if the function is not duplicated, false otherwise.
+   */
+  private boolean extractAndRewriteArgMinMaxFunctions(Expression expression,
+      HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMaxFunctionIDMap,
+      HashMap<List<Expression>, Set<Expression>> argMinFunctionMap,
+      HashMap<List<Expression>, Integer> argMinFunctionIDMap) {
+    Function function = expression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    String functionName = function.getOperator();
+    if (!(functionName.equals("argmin") || functionName.equals("argmax"))) {
+      return true;
+    }
+    List<Expression> operands = function.getOperands();
+    List<Expression> argMinMaxMeasuringExpressions = new ArrayList<>();
+    for (int i = 0; i < operands.size() - 1; i++) {
+      argMinMaxMeasuringExpressions.add(operands.get(i));
+    }
+    Expression argMinMaxProjectionExpression = operands.get(operands.size() - 1);
+
+    if (functionName.equals("argmin")) {

Review Comment:
   nit: use constant that were defined here?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ParentAggregationResultRewriter.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Use the result of parent aggregation functions to populate the result of child aggregation functions.
+ * This implementation is based on the column names of the result schema.
+ * The result column name of a parent aggregation function has the following format:
+ * CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + aggregationFunctionType + FunctionID
+ * The result column name of corresponding child aggregation function has the following format:
+ * aggregationFunctionType + FunctionID + CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ * + childFunctionKey
+ * This approach will not work with `AS` clauses as they alter the column names.
+ * TODO: Add support for `AS` clauses.
+ */
+public class ParentAggregationResultRewriter implements ResultRewriter {
+  public ParentAggregationResultRewriter() {
+  }
+
+  public static Map<String, ChildFunctionMapping> createChildFunctionMapping(DataSchema schema, Object[] row) {
+    Map<String, ChildFunctionMapping> childFunctionMapping = new HashMap<>();
+    for (int i = 0; i < schema.size(); i++) {
+      String columnName = schema.getColumnName(i);
+      if (columnName.startsWith(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX)) {
+        ParentAggregationFunctionResultObject parent = (ParentAggregationFunctionResultObject) row[i];
+
+        DataSchema nestedSchema = parent.getSchema();
+        for (int j = 0; j < nestedSchema.size(); j++) {
+          String childColumnKey = nestedSchema.getColumnName(j);
+          String originalChildFunctionKey =
+              columnName.substring(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX.length())
+                  + CommonConstants.RewriterConstants.CHILD_KEY_SEPERATOR + childColumnKey;
+          // aggregationFunctionType + childFunctionID + CHILD_KEY_SEPERATOR + childFunctionKeyInParent
+          childFunctionMapping.put(originalChildFunctionKey, new ChildFunctionMapping(parent, j, i));
+        }
+      }
+    }
+    return childFunctionMapping;
+  }
+
+  public RewriterResult rewrite(DataSchema dataSchema, List<Object[]> rows) {
+    int numParentAggregationFunctions = 0;
+    // Count the number of parent aggregation functions
+    for (int i = 0; i < dataSchema.size(); i++) {
+      if (dataSchema.getColumnName(i).startsWith(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX)) {
+        numParentAggregationFunctions++;
+      }
+    }
+
+    if (numParentAggregationFunctions == 0 || rows.isEmpty()) {

Review Comment:
   i'd return right away (without calculating `numParentAggregationFunctions`) if rows is empty. avoids the extra step to walk the DataSchema list



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/ParentAggregationFunctionResultObject.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.utils;
+
+import java.io.Serializable;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+public interface ParentAggregationFunctionResultObject

Review Comment:
   Add a javadoc here so it's clearer when and where this interface is used (in case in the future someone wants to extend this for some other functionaity)



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ParentAggregationFunction.java:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public abstract class ParentAggregationFunction<I, F extends ParentAggregationFunctionResultObject>

Review Comment:
   Add javadocs here too



##########
pinot-core/src/test/java/org/apache/pinot/queries/ArgMinMaxTest.java:
##########
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Queries test for histogram queries.

Review Comment:
   nit: is this javadoc right? should probably be arg min/max queries, right?



##########
pinot-core/src/test/java/org/apache/pinot/queries/ArgMinMaxTest.java:
##########
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Queries test for histogram queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ArgMinMaxTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "HistogramQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final int NUM_RECORDS = 2000;
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String MV_INT_COLUMN = "mvIntColumn";
+  private static final String MV_BYTES_COLUMN = "mvBytesColumn";
+  private static final String MV_STRING_COLUMN = "mvStringColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String GROUP_BY_INT_COLUMN = "groupByIntColumn";
+  private static final String GROUP_BY_MV_INT_COLUMN = "groupByMVIntColumn";
+  private static final String GROUP_BY_INT_COLUMN2 = "groupByIntColumn2";
+  private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE).addMultiValueDimension(MV_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(MV_BYTES_COLUMN, DataType.BYTES)
+      .addMultiValueDimension(MV_STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(GROUP_BY_MV_INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN2, DataType.INT)
+      .build();
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return " WHERE intColumn >=  500";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    String[] stringSVVals = new String[]{"a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9", "a11", "a22"};
+    int j = 1;
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      GenericRow record = new GenericRow();
+      record.putValue(INT_COLUMN, i);
+      record.putValue(LONG_COLUMN, (long) i - NUM_RECORDS / 2);
+      record.putValue(FLOAT_COLUMN, (float) i * 0.5);
+      record.putValue(DOUBLE_COLUMN, (double) i);
+      record.putValue(MV_INT_COLUMN, Arrays.asList(i, i + 1, i + 2));
+      record.putValue(MV_BYTES_COLUMN, Arrays.asList(String.valueOf(i).getBytes(), String.valueOf(i + 1).getBytes(),
+          String.valueOf(i + 2).getBytes()));
+      record.putValue(MV_STRING_COLUMN, Arrays.asList("a" + i, "a" + i + 1, "a" + i + 2));
+      if (i < 20) {
+        record.putValue(STRING_COLUMN, stringSVVals[i % stringSVVals.length]);
+      } else {
+        record.putValue(STRING_COLUMN, "a33");
+      }
+      record.putValue(GROUP_BY_INT_COLUMN, i % 5);
+      record.putValue(GROUP_BY_MV_INT_COLUMN, Arrays.asList(i % 10, (i + 1) % 10));
+      if (i == j) {
+        j *= 2;
+      }
+      record.putValue(GROUP_BY_INT_COLUMN2, j);
+      records.add(record);
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+
+    QueryRewriterFactory.init(String.join(",", QueryRewriterFactory.DEFAULT_QUERY_REWRITERS_CLASS_NAMES)
+        + ",org.apache.pinot.sql.parsers.rewriter.ArgMinMaxRewriter");
+    ResultRewriterFactory
+        .init("org.apache.pinot.core.query.utils.rewriter.ParentAggregationResultRewriter");
+  }
+
+  @Test
+  public void testAggregationInterSegment() {
+    // Simple inter segment
+    String query = "SELECT arg_max(intColumn, longColumn) FROM testTable";
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+
+    assertEquals(rows.get(0)[0], 999L);
+    assertEquals(rows.get(1)[0], 999L);
+    assertEquals(rows.size(), 2);
+
+    // Inter segment data type test
+    query = "SELECT arg_max(intColumn, longColumn), arg_max(intColumn, floatColumn), "
+        + "arg_max(intColumn, doubleColumn), arg_min(intColumn, mvIntColumn), "
+        + "arg_min(intColumn, mvStringColumn), arg_min(intColumn, intColumn) FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(resultTable.getDataSchema().getColumnName(0), "argmax([intColumn, longColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(1), "argmax([intColumn, floatColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(2), "argmax([intColumn, doubleColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(3), "argmin([intColumn, mvIntColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(4), "argmin([intColumn, mvStringColumn])");
+    assertEquals(resultTable.getDataSchema().getColumnName(5), "argmin([intColumn, intColumn])");
+
+    assertEquals(rows.size(), 2);
+    assertEquals(rows.get(0)[0], 999L);
+    assertEquals(rows.get(1)[0], 999L);
+    assertEquals(rows.get(0)[1], 999.5F);
+    assertEquals(rows.get(1)[1], 999.5F);
+    assertEquals(rows.get(0)[2], 1999D);
+    assertEquals(rows.get(1)[2], 1999D);
+    assertEquals(rows.get(0)[3], new Integer[]{0, 1, 2});
+    assertEquals(rows.get(1)[3], new Integer[]{0, 1, 2});
+    assertEquals(rows.get(0)[4], new String[]{"a0", "a01", "a02"});
+    assertEquals(rows.get(1)[4], new String[]{"a0", "a01", "a02"});
+    assertEquals(rows.get(0)[5], 0);
+    assertEquals(rows.get(1)[5], 0);
+
+    // Inter segment mix aggregation function with different result length
+    // Inter segment string column comparison test
+    query = "SELECT sum(intColumn), argmin(stringColumn, doubleColumn), argmin(stringColumn, stringColumn), "
+        + "argmin(stringColumn, doubleColumn, doubleColumn) FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(rows.size(), 4);
+
+    assertEquals(rows.get(0)[0], 7996000D);
+    assertEquals(rows.get(0)[1], 8D);
+    assertEquals(rows.get(0)[2], "a11");
+    assertEquals(rows.get(0)[3], 8D);
+
+    assertNull(rows.get(1)[0]);
+    assertEquals(rows.get(1)[1], 18D);
+    assertEquals(rows.get(1)[2], "a11");
+    assertEquals(rows.get(1)[3], 8D);
+
+    assertNull(rows.get(2)[0]);
+    assertEquals(rows.get(2)[1], 8D);
+    assertEquals(rows.get(2)[2], "a11");
+    assertNull(rows.get(2)[3]);
+
+    assertNull(rows.get(3)[0]);
+    assertEquals(rows.get(3)[1], 18D);
+    assertEquals(rows.get(3)[2], "a11");
+    assertNull(rows.get(3)[3]);
+
+    // Inter segment mix aggregation function with CASE statement
+    query = "SELECT argmin(CASE WHEN stringColumn = 'a33' THEN 'b' WHEN stringColumn = 'a22' THEN 'a' ELSE 'c' END"
+        + ", stringColumn), argmin(CASE WHEN stringColumn = 'a33' THEN 'b' WHEN stringColumn = 'a22' THEN 'a' "
+        + "ELSE 'c' END, CASE WHEN stringColumn = 'a33' THEN 'b' WHEN stringColumn = 'a22' THEN 'a' ELSE 'c' END) "
+        + "FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    resultTable = brokerResponse.getResultTable();
+    rows = resultTable.getRows();
+
+    assertEquals(rows.size(), 4);
+
+    assertEquals(rows.get(0)[0], "a22");
+    assertEquals(rows.get(0)[1], "a");
+    assertEquals(rows.get(1)[0], "a22");
+    assertEquals(rows.get(1)[1], "a");
+
+    //   TODO: The following query results in an exception, fix the support for multi-value bytes

Review Comment:
   perhaps uncomment this code and assert for the exception to be thrown. that way if this issue is fixed, this test will fail and enforce that someone fixes the test.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ResultRewriterFactory.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ResultRewriterFactory {
+
+  private ResultRewriterFactory() {
+  }
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ResultRewriterFactory.class);
+  static final List<String> DEFAULT_RESULT_REWRITERS_CLASS_NAMES = ImmutableList.of();

Review Comment:
   is this intentionally left empty? does this mean that if someone wants to use arg min/max they must set the broker config: `pinot.broker.result.rewriter.class.names`



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxObject.java:
##########
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+
+
+public class ArgMinMaxObject implements ParentAggregationFunctionResultObject {
+
+  // if the object is created but not yet populated, this happens e.g. when a server has no data for
+  // the query and returns a default value
+  public static final int NOT_NULL_OBJECT = 1;
+  public static final int IS_NULL_OBJECT = 0;
+  // if the object contains non null values
+  private boolean _isNull;
+
+  // if the value is stored in a mutable list, this is true only when the Object is deserialized
+  // from a byte buffer
+  private boolean _mutable;

Review Comment:
   the code marks `_mutable` as true when calling this constructor `ArgMinMaxObject(DataSchema keySchema, DataSchema valSchema)` and false for the `ByteBuffer` constructor. Does this comment need to be updated or the flag needs to be reversed? it's a bit confusing



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -972,4 +974,11 @@ public static class Range {
   public static class IdealState {
     public static final String HYBRID_TABLE_TIME_BOUNDARY = "HYBRID_TABLE_TIME_BOUNDARY";
   }
+
+  public static class RewriterConstants {
+    public static final String PARENT_AGGREGATION_NAME_PREFIX = "pinotparentaggregation";
+    public static final String CHILD_AGGREGATION_NAME_PREFIX = "pinotchildaggregation";

Review Comment:
   nit: can these values be camel cased? e.g. pinotchildaggregation -> pinotChildAggregation



##########
pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriterTest.java:
##########
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+
+
+public class ArgMinMaxRewriterTest {
+  private static final QueryRewriter QUERY_REWRITER = new ArgMinMaxRewriter();
+
+  @Test
+  public void testQueryRewrite() {

Review Comment:
   Have some suggestions for enhancing the tests here:
   
   - can you also add a few group by queries here?
   - also add some examples where the measuring columns are different?
   - Can you also have one example where the same columns are used in the measuring expressions list, but the order is swapped (e.g. ARG_MIN(col1, col2, col3), ARG_MIN(col2, col1, col4))?
   
   I'd especially like to see examples of where the function IDs are non-0



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ParentAggregationResultRewriter.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Use the result of parent aggregation functions to populate the result of child aggregation functions.
+ * This implementation is based on the column names of the result schema.
+ * The result column name of a parent aggregation function has the following format:
+ * CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + aggregationFunctionType + FunctionID
+ * The result column name of corresponding child aggregation function has the following format:
+ * aggregationFunctionType + FunctionID + CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ * + childFunctionKey
+ * This approach will not work with `AS` clauses as they alter the column names.
+ * TODO: Add support for `AS` clauses.
+ */
+public class ParentAggregationResultRewriter implements ResultRewriter {
+  public ParentAggregationResultRewriter() {
+  }
+
+  public static Map<String, ChildFunctionMapping> createChildFunctionMapping(DataSchema schema, Object[] row) {

Review Comment:
   nit: does this need to be public? i only see it used in this class



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/QueryRewriterFactory.java:
##########
@@ -33,7 +33,7 @@ private QueryRewriterFactory() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(QueryRewriterFactory.class);
 
-  static final List<String> DEFAULT_QUERY_REWRITERS_CLASS_NAMES =
+  public static final List<String> DEFAULT_QUERY_REWRITERS_CLASS_NAMES =

Review Comment:
   nit: can you add the `@VisibleForTesting` annotation here?
   Also don't see the new `ArgMinMaxRewriter` on the default list. Does this mean we intend to only enable this for specific tenants via setting the broker config?



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with the same measuring expressions
+ * are consolidated and added as a single function with a list of projection expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the end of the selection list.

Review Comment:
   Can you elaborate on what `PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)` here means? What's `#0` and `2` for example? What determines #0 (functionId)'s value?



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with the same measuring expressions
+ * are consolidated and added as a single function with a list of projection expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the projection expression,

Review Comment:
   Elaborate on how function ID is decided, i.e. it's determined by the measuring expressions, and each unique list of measuring expressions will result in a new function ID?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ParentArgMinMaxAggregationFunction.java:
##########
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxObject;
+import org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxWrapperValSet;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class ParentArgMinMaxAggregationFunction extends ParentAggregationFunction<ArgMinMaxObject, ArgMinMaxObject> {
+
+  // list of columns that we do min/max on
+  private final List<ExpressionContext> _measuringColumns;
+  // list of columns that we project based on the min/max value
+  private final List<ExpressionContext> _projectionColumns;
+  // true if we are doing argmax, false if we are doing argmin
+  private final boolean _isMax;
+  // the id of the function, this is to associate the result of the parent aggregation function with the
+  // child aggregation functions having the same type(argmin/argmax) and measuring columns
+  private final ExpressionContext _functionIdContext;
+  private final ExpressionContext _numMeasuringColumnContext;
+  // number of columns that we do min/max on
+  private final int _numMeasuringColumns;
+  // number of columns that we project based on the min/max value
+  private final int _numProjectionColumns;
+
+  // The following variable need to be initialized
+
+  // The wrapper classes for the block value sets
+  private final ThreadLocal<List<ArgMinMaxWrapperValSet>> _argMinMaxWrapperMeasuringColumnSets =
+      ThreadLocal.withInitial(ArrayList::new);
+  private final ThreadLocal<List<ArgMinMaxWrapperValSet>> _argMinMaxWrapperProjectionColumnSets =
+      ThreadLocal.withInitial(ArrayList::new);
+  // The schema for the measuring columns and projection columns
+  private final ThreadLocal<DataSchema> _measuringColumnSchema = new ThreadLocal<>();
+  private final ThreadLocal<DataSchema> _projectionColumnSchema = new ThreadLocal<>();
+  // If the schemas are initialized
+  private final ThreadLocal<Boolean> _schemaInitiated = ThreadLocal.withInitial(() -> false);

Review Comment:
   nit: rename to `_schemaInitialized`



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DummyGroupByResultHolder.java:
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.groupby;
+
+/**
+ * Placeholder GroupByResultHolder that does noop

Review Comment:
   maybe add a comment on where this placeholder is used?



##########
pinot-core/src/test/java/org/apache/pinot/queries/ArgMinMaxTest.java:
##########
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Queries test for histogram queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ArgMinMaxTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "HistogramQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final int NUM_RECORDS = 2000;
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String MV_INT_COLUMN = "mvIntColumn";
+  private static final String MV_BYTES_COLUMN = "mvBytesColumn";
+  private static final String MV_STRING_COLUMN = "mvStringColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String GROUP_BY_INT_COLUMN = "groupByIntColumn";
+  private static final String GROUP_BY_MV_INT_COLUMN = "groupByMVIntColumn";
+  private static final String GROUP_BY_INT_COLUMN2 = "groupByIntColumn2";
+  private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE).addMultiValueDimension(MV_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(MV_BYTES_COLUMN, DataType.BYTES)
+      .addMultiValueDimension(MV_STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(GROUP_BY_MV_INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(GROUP_BY_INT_COLUMN2, DataType.INT)
+      .build();
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return " WHERE intColumn >=  500";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    String[] stringSVVals = new String[]{"a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9", "a11", "a22"};
+    int j = 1;
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      GenericRow record = new GenericRow();
+      record.putValue(INT_COLUMN, i);
+      record.putValue(LONG_COLUMN, (long) i - NUM_RECORDS / 2);
+      record.putValue(FLOAT_COLUMN, (float) i * 0.5);
+      record.putValue(DOUBLE_COLUMN, (double) i);
+      record.putValue(MV_INT_COLUMN, Arrays.asList(i, i + 1, i + 2));
+      record.putValue(MV_BYTES_COLUMN, Arrays.asList(String.valueOf(i).getBytes(), String.valueOf(i + 1).getBytes(),
+          String.valueOf(i + 2).getBytes()));
+      record.putValue(MV_STRING_COLUMN, Arrays.asList("a" + i, "a" + i + 1, "a" + i + 2));
+      if (i < 20) {
+        record.putValue(STRING_COLUMN, stringSVVals[i % stringSVVals.length]);
+      } else {
+        record.putValue(STRING_COLUMN, "a33");
+      }
+      record.putValue(GROUP_BY_INT_COLUMN, i % 5);
+      record.putValue(GROUP_BY_MV_INT_COLUMN, Arrays.asList(i % 10, (i + 1) % 10));
+      if (i == j) {
+        j *= 2;
+      }
+      record.putValue(GROUP_BY_INT_COLUMN2, j);
+      records.add(record);
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+
+    QueryRewriterFactory.init(String.join(",", QueryRewriterFactory.DEFAULT_QUERY_REWRITERS_CLASS_NAMES)
+        + ",org.apache.pinot.sql.parsers.rewriter.ArgMinMaxRewriter");
+    ResultRewriterFactory
+        .init("org.apache.pinot.core.query.utils.rewriter.ParentAggregationResultRewriter");

Review Comment:
   Can you add a few test cases where we don't call any argmin/argmax aggregation functions while these configs are set? Just to ensure that non-argmin/max aggregations and group by queries still work as expected.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ChildAggregationFunction.java:
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.DummyAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.DummyGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public abstract class ChildAggregationFunction implements AggregationFunction<Long, Long> {
+
+  private static final int CHILD_AGGREGATION_FUNCTION_ID_OFFSET = 0;
+  private static final int CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET = 1;
+  private final ExpressionContext _childFunctionKeyInParent;
+  private final List<ExpressionContext> _resultNameOperands;
+  private final ExpressionContext _childFunctionID;
+
+  ChildAggregationFunction(List<ExpressionContext> operands) {
+    _childFunctionID = operands.get(CHILD_AGGREGATION_FUNCTION_ID_OFFSET);
+    _childFunctionKeyInParent = operands.get(CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET);
+    _resultNameOperands = operands.subList(CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET + 1, operands.size());
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    ArrayList<ExpressionContext> expressionContexts = new ArrayList<>();
+    expressionContexts.add(_childFunctionID);
+    expressionContexts.add(_childFunctionKeyInParent);
+    expressionContexts.addAll(_resultNameOperands);
+    return expressionContexts;
+  }
+
+  @Override
+  public final AggregationResultHolder createAggregationResultHolder() {
+    return new DummyAggregationResultHolder();
+  }
+
+  @Override
+  public final GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new DummyGroupByResultHolder();
+  }
+
+  @Override
+  public final void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final Long extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    return 0L;
+  }
+
+  @Override
+  public final Long extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    return 0L;
+  }
+
+  @Override
+  public final Long merge(Long intermediateResult1, Long intermediateResult2) {
+    return 0L;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.LONG;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.UNKNOWN;
+  }
+
+  @Override
+  public final Long extractFinalResult(Long longValue) {
+    return 0L;
+  }
+
+  /**
+   * The name of the column as follows:
+   * CHILD_AGGREGATION_NAME_PREFIX + actual function type + operands + CHILD_AGGREGATION_SEPERATOR
+   * + actual function type + parent aggregation function id + CHILD_KEY_SEPERATOR + column key in parent function
+   */

Review Comment:
   an example will be very useful here



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with the same measuring expressions
+ * are consolidated and added as a single function with a list of projection expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the projection expression,
+ * used as column key for result column filler.
+ * Latter, the aggregation, result of the consolidated function will be filled into the corresponding
+ * columns of the original ARG_MIN/ARG_MAX. For more syntax details please refer to ParentAggregationFunction,
+ * ChildAggregationFunction and ChildAggregationResultRewriter.
+ */
+public class ArgMinMaxRewriter implements QueryRewriter {
+
+  private static final String ARG_MAX = "argmax";
+  private static final String ARG_MIN = "argmin";
+
+  private static final String ARG_MAX_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + ARG_MAX;
+  private static final String ARG_MIN_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + ARG_MIN;
+
+  @Override
+  public PinotQuery rewrite(PinotQuery pinotQuery) {
+    // This map stores the mapping from the list of measuring expressions to the set of projection expressions
+    HashMap<List<Expression>, Set<Expression>> argMinFunctionMap = new HashMap<>();
+    // This map stores the mapping from the list of measuring expressions to the function ID
+    HashMap<List<Expression>, Integer> argMinFunctionIDMap = new HashMap<>();
+
+    HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap = new HashMap<>();
+    HashMap<List<Expression>, Integer> argMaxFunctionIDMap = new HashMap<>();
+
+    Iterator<Expression> iterator = pinotQuery.getSelectList().iterator();
+    while (iterator.hasNext()) {
+      boolean added = extractAndRewriteArgMinMaxFunctions(iterator.next(), argMaxFunctionMap, argMaxFunctionIDMap,
+          argMinFunctionMap, argMinFunctionIDMap);
+      // Remove the original function if it is not added, meaning it is a duplicate
+      if (!added) {
+        iterator.remove();
+      }
+    }
+
+    appendParentArgMinMaxFunctions(false, pinotQuery.getSelectList(), argMinFunctionMap, argMinFunctionIDMap);
+    appendParentArgMinMaxFunctions(true, pinotQuery.getSelectList(), argMaxFunctionMap, argMaxFunctionIDMap);
+
+    return pinotQuery;
+  }
+
+  /**
+   * This method appends the consolidated ARG_MIN/ARG_MAX functions to the end of the selection list.
+   * The consolidated function call will be in the following format:
+   * ARG_MAX(functionID, numMeasuringColumns, measuringColumn1, measuringColumn2, ...,
+   *  projectionColumn1, projectionColumn2, ...)
+   *  where functionID is the ID of the consolidated function, numMeasuringColumns is the number of measuring
+   *  columns, measuringColumn1, measuringColumn2, ... are the measuring columns, and projectionColumn1,
+   *  projectionColumn2, ... are the projection columns.
+   *  The number of projection columns is the same as the number of ARG_MIN/ARG_MAX functions with the same
+   *  measuring columns.
+   */
+  private void appendParentArgMinMaxFunctions(boolean isMax, List<Expression> selectList,
+      HashMap<List<Expression>, Set<Expression>> argMinMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMinMaxFunctionIDMap) {
+    for (Map.Entry<List<Expression>, Set<Expression>> entry : argMinMaxFunctionMap.entrySet()) {
+      Literal functionID = new Literal();
+      functionID.setLongValue(argMinMaxFunctionIDMap.get(entry.getKey()));
+      Literal numMeasuringColumns = new Literal();
+      numMeasuringColumns.setLongValue(entry.getKey().size());
+
+      Function parentFunction = new Function(isMax ? ARG_MAX_PARENT : ARG_MIN_PARENT);
+      parentFunction.addToOperands(new Expression(ExpressionType.LITERAL).setLiteral(functionID));
+      parentFunction.addToOperands(new Expression(ExpressionType.LITERAL).setLiteral(numMeasuringColumns));
+      for (Expression expression : entry.getKey()) {
+        parentFunction.addToOperands(expression);
+      }
+      for (Expression expression : entry.getValue()) {
+        parentFunction.addToOperands(expression);
+      }
+      selectList.add(new Expression(ExpressionType.FUNCTION).setFunctionCall(parentFunction));
+    }
+  }
+
+  /**
+   * This method extracts the ARG_MIN/ARG_MAX functions from the given expression and rewrites the functions
+   * with the same measuring expressions to use the same function ID.
+   * @return true if the function is not duplicated, false otherwise.
+   */
+  private boolean extractAndRewriteArgMinMaxFunctions(Expression expression,
+      HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap,
+      HashMap<List<Expression>, Integer> argMaxFunctionIDMap,
+      HashMap<List<Expression>, Set<Expression>> argMinFunctionMap,
+      HashMap<List<Expression>, Integer> argMinFunctionIDMap) {
+    Function function = expression.getFunctionCall();
+    if (function == null) {
+      return true;
+    }
+    String functionName = function.getOperator();
+    if (!(functionName.equals("argmin") || functionName.equals("argmax"))) {

Review Comment:
   nit: use constant that were defined here?



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with the same measuring expressions
+ * are consolidated and added as a single function with a list of projection expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the projection expression,
+ * used as column key for result column filler.
+ * Latter, the aggregation, result of the consolidated function will be filled into the corresponding

Review Comment:
   nit: Latter -> Later



##########
pinot-core/src/test/java/org/apache/pinot/queries/ArgMinMaxTest.java:
##########
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+/**
+ * Queries test for histogram queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ArgMinMaxTest extends BaseQueriesTest {

Review Comment:
   Can you add a few tests with `AS` to ensure that some kind of meaningful exception is thrown? Or at least show what the behavior will be with aliases? 



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ArgMinMaxRewriter.java:
##########
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * This rewriter rewrites ARG_MIN/ARG_MAX function, so that the functions with the same measuring expressions
+ * are consolidated and added as a single function with a list of projection expressions. For example, the query
+ * "SELECT ARG_MIN(col1, col2, col3), ARG_MIN(col1, col2, col4) FROM myTable" will be consolidated to a single
+ * function "PARENT_ARG_MIN(#0, 2, col1, col2, col3, col4)". and added to the end of the selection list.
+ * While the original ARG_MIN(col1, col2, col3) and ARG_MIN(col1, col2, col4) will be rewritten to
+ * CHILD_ARG_MIN(#0, col3, col1, col2, col3) and CHILD_ARG_MIN(#0, col4, col1, col2, col4) respectively.
+ * The 2 new parameters for CHILD_ARG_MIN are the function ID and the projection expression,
+ * used as column key for result column filler.
+ * Latter, the aggregation, result of the consolidated function will be filled into the corresponding
+ * columns of the original ARG_MIN/ARG_MAX. For more syntax details please refer to ParentAggregationFunction,
+ * ChildAggregationFunction and ChildAggregationResultRewriter.
+ */
+public class ArgMinMaxRewriter implements QueryRewriter {
+
+  private static final String ARG_MAX = "argmax";
+  private static final String ARG_MIN = "argmin";
+
+  private static final String ARG_MAX_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + ARG_MAX;
+  private static final String ARG_MIN_PARENT =
+      CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + ARG_MIN;
+
+  @Override
+  public PinotQuery rewrite(PinotQuery pinotQuery) {
+    // This map stores the mapping from the list of measuring expressions to the set of projection expressions
+    HashMap<List<Expression>, Set<Expression>> argMinFunctionMap = new HashMap<>();
+    // This map stores the mapping from the list of measuring expressions to the function ID
+    HashMap<List<Expression>, Integer> argMinFunctionIDMap = new HashMap<>();
+
+    HashMap<List<Expression>, Set<Expression>> argMaxFunctionMap = new HashMap<>();
+    HashMap<List<Expression>, Integer> argMaxFunctionIDMap = new HashMap<>();
+
+    Iterator<Expression> iterator = pinotQuery.getSelectList().iterator();
+    while (iterator.hasNext()) {
+      boolean added = extractAndRewriteArgMinMaxFunctions(iterator.next(), argMaxFunctionMap, argMaxFunctionIDMap,
+          argMinFunctionMap, argMinFunctionIDMap);
+      // Remove the original function if it is not added, meaning it is a duplicate
+      if (!added) {
+        iterator.remove();
+      }
+    }
+
+    appendParentArgMinMaxFunctions(false, pinotQuery.getSelectList(), argMinFunctionMap, argMinFunctionIDMap);
+    appendParentArgMinMaxFunctions(true, pinotQuery.getSelectList(), argMaxFunctionMap, argMaxFunctionIDMap);
+
+    return pinotQuery;
+  }
+
+  /**
+   * This method appends the consolidated ARG_MIN/ARG_MAX functions to the end of the selection list.
+   * The consolidated function call will be in the following format:
+   * ARG_MAX(functionID, numMeasuringColumns, measuringColumn1, measuringColumn2, ...,
+   *  projectionColumn1, projectionColumn2, ...)
+   *  where functionID is the ID of the consolidated function, numMeasuringColumns is the number of measuring
+   *  columns, measuringColumn1, measuringColumn2, ... are the measuring columns, and projectionColumn1,
+   *  projectionColumn2, ... are the projection columns.
+   *  The number of projection columns is the same as the number of ARG_MIN/ARG_MAX functions with the same
+   *  measuring columns.

Review Comment:
   nit: can you fix the comment indentation?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ParentAggregationResultRewriter.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Use the result of parent aggregation functions to populate the result of child aggregation functions.
+ * This implementation is based on the column names of the result schema.
+ * The result column name of a parent aggregation function has the following format:
+ * CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + aggregationFunctionType + FunctionID
+ * The result column name of corresponding child aggregation function has the following format:
+ * aggregationFunctionType + FunctionID + CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ * + childFunctionKey
+ * This approach will not work with `AS` clauses as they alter the column names.

Review Comment:
   what happens if someone specifies `AS` in these queries?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ResultRewriter.java:
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+/**
+ * Interface for rewriting the result of a query

Review Comment:
   Does it make sense to call out somewhere what kind of queries can invoke the `ResultRewriter`? For example, as part of this PR this is invoked for group by and aggregations only.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxObject.java:
##########
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+
+
+public class ArgMinMaxObject implements ParentAggregationFunctionResultObject {
+
+  // if the object is created but not yet populated, this happens e.g. when a server has no data for
+  // the query and returns a default value
+  public static final int NOT_NULL_OBJECT = 1;
+  public static final int IS_NULL_OBJECT = 0;
+  // if the object contains non null values
+  private boolean _isNull;
+
+  // if the value is stored in a mutable list, this is true only when the Object is deserialized
+  // from a byte buffer
+  private boolean _mutable;
+
+  // the schema of the measuring columns
+  private final DataSchema _keySchema;
+  // the schema of the projection columns
+  private final DataSchema _valSchema;
+
+  // the size of the extremum key cols and value clos

Review Comment:
   nit: clos -> cols
   



##########
pinot-core/src/main/java/org/apache/pinot/core/query/utils/rewriter/ParentAggregationResultRewriter.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.utils.rewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Use the result of parent aggregation functions to populate the result of child aggregation functions.
+ * This implementation is based on the column names of the result schema.
+ * The result column name of a parent aggregation function has the following format:
+ * CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX + aggregationFunctionType + FunctionID
+ * The result column name of corresponding child aggregation function has the following format:
+ * aggregationFunctionType + FunctionID + CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+ * + childFunctionKey
+ * This approach will not work with `AS` clauses as they alter the column names.
+ * TODO: Add support for `AS` clauses.
+ */
+public class ParentAggregationResultRewriter implements ResultRewriter {
+  public ParentAggregationResultRewriter() {
+  }
+
+  public static Map<String, ChildFunctionMapping> createChildFunctionMapping(DataSchema schema, Object[] row) {
+    Map<String, ChildFunctionMapping> childFunctionMapping = new HashMap<>();
+    for (int i = 0; i < schema.size(); i++) {
+      String columnName = schema.getColumnName(i);
+      if (columnName.startsWith(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX)) {
+        ParentAggregationFunctionResultObject parent = (ParentAggregationFunctionResultObject) row[i];
+
+        DataSchema nestedSchema = parent.getSchema();
+        for (int j = 0; j < nestedSchema.size(); j++) {
+          String childColumnKey = nestedSchema.getColumnName(j);
+          String originalChildFunctionKey =
+              columnName.substring(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX.length())
+                  + CommonConstants.RewriterConstants.CHILD_KEY_SEPERATOR + childColumnKey;
+          // aggregationFunctionType + childFunctionID + CHILD_KEY_SEPERATOR + childFunctionKeyInParent
+          childFunctionMapping.put(originalChildFunctionKey, new ChildFunctionMapping(parent, j, i));
+        }
+      }
+    }
+    return childFunctionMapping;
+  }
+
+  public RewriterResult rewrite(DataSchema dataSchema, List<Object[]> rows) {
+    int numParentAggregationFunctions = 0;
+    // Count the number of parent aggregation functions
+    for (int i = 0; i < dataSchema.size(); i++) {
+      if (dataSchema.getColumnName(i).startsWith(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX)) {
+        numParentAggregationFunctions++;
+      }
+    }
+
+    if (numParentAggregationFunctions == 0 || rows.isEmpty()) {
+      // no change to the result
+      return new RewriterResult(dataSchema, rows);
+    }
+
+    // Create a mapping from the child aggregation function name to the child aggregation function
+    Map<String, ChildFunctionMapping> childFunctionMapping = createChildFunctionMapping(dataSchema, rows.get(0));
+
+    String[] newColumnNames = new String[dataSchema.size() - numParentAggregationFunctions];
+    DataSchema.ColumnDataType[] newColumnDataTypes
+        = new DataSchema.ColumnDataType[dataSchema.size() - numParentAggregationFunctions];
+
+    // Create a mapping from the function offset in the final aggregation result
+    // to its own/parent function offset in the original aggregation result
+    Map<Integer, Integer> aggregationFunctionIndexMapping = new HashMap<>();
+    // Create a set of the result indices of the child aggregation functions
+    Set<Integer> childAggregationFunctionIndices = new HashSet<>();
+    // Create a mapping from the result aggregation function index to the nested index of the
+    // child aggregation function in the parent aggregation function
+    Map<Integer, Integer> childAggregationFunctionNestedIndexMapping = new HashMap<>();
+    // Create a set of the result indices of the parent aggregation functions
+    Set<Integer> parentAggregationFunctionIndices = new HashSet<>();
+
+    for (int i = 0, j = 0; i < dataSchema.size(); i++) {
+      String columnName = dataSchema.getColumnName(i);
+      // Skip the parent aggregation functions
+      if (columnName.startsWith(CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX)) {
+        parentAggregationFunctionIndices.add(i);
+        continue;
+      }
+
+      // for child aggregation functions and regular columns in the result
+      // create a new schema and populate the new column names and data types
+      // also populate the offset mappings used to rewrite the result
+      if (columnName.startsWith(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX)) {
+        // This is a child column of a parent aggregation function
+        String childAggregationFunctionNameWithKey =
+            columnName.substring(CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX.length());
+        String[] s = childAggregationFunctionNameWithKey
+            .split(CommonConstants.RewriterConstants.CHILD_AGGREGATION_SEPERATOR);
+        newColumnNames[j] = s[0];
+        ChildFunctionMapping childFunction = childFunctionMapping.get(s[1]);
+        newColumnDataTypes[j] = childFunction.getParent().getSchema()
+            .getColumnDataType(childFunction.getNestedOffset());
+
+        childAggregationFunctionNestedIndexMapping.put(j, childFunction.getNestedOffset());
+        childAggregationFunctionIndices.add(j);
+        aggregationFunctionIndexMapping.put(j, childFunction.getOffset());
+      } else {
+        // This is a regular column
+        newColumnNames[j] = columnName;
+        newColumnDataTypes[j] = dataSchema.getColumnDataType(i);
+
+        aggregationFunctionIndexMapping.put(j, i);
+      }
+      j++;
+    }
+
+    DataSchema newDataSchema = new DataSchema(newColumnNames, newColumnDataTypes);
+    List<Object[]> newRows = new ArrayList<>();
+
+    for (Object[] row : rows) {
+      int maxRows = parentAggregationFunctionIndices.stream().map(k -> {
+        ParentAggregationFunctionResultObject parentAggregationFunctionResultObject =
+            (ParentAggregationFunctionResultObject) row[k];
+        return parentAggregationFunctionResultObject.getNumberOfRows();
+      }).max(Integer::compareTo).orElse(0);
+      maxRows = maxRows == 0 ? 1 : maxRows;
+
+      List<Object[]> newRowsBuffer = new ArrayList<>();
+      for (int rowIter = 0; rowIter < maxRows; rowIter++) {
+        Object[] newRow = new Object[newDataSchema.size()];
+        for (int fieldIter = 0; fieldIter < newDataSchema.size(); fieldIter++) {
+          // If the field is a child aggregation function, extract the value from the parent result
+          if (childAggregationFunctionIndices.contains(fieldIter)) {
+            int offset = aggregationFunctionIndexMapping.get(fieldIter);
+            int nestedOffset = childAggregationFunctionNestedIndexMapping.get(fieldIter);
+            ParentAggregationFunctionResultObject parentAggregationFunctionResultObject =
+                (ParentAggregationFunctionResultObject) row[offset];
+            // If the parent result has more rows than the current row, extract the value from the row
+            if (rowIter < parentAggregationFunctionResultObject.getNumberOfRows()) {
+              newRow[fieldIter] = parentAggregationFunctionResultObject.getField(rowIter, nestedOffset);
+            } else {
+              newRow[fieldIter] = null;
+            }
+          } else { // If the field is a regular column, extract the value from the row, only the first row has value
+            if (rowIter == 0) {
+              newRow[fieldIter] = row[aggregationFunctionIndexMapping.get(fieldIter)];
+            } else {
+              newRow[fieldIter] = null;
+            }
+          }
+        }
+        newRowsBuffer.add(newRow);
+      }
+      newRows.addAll(newRowsBuffer);
+    }
+    return new RewriterResult(newDataSchema, newRows);
+  }
+
+  /**
+   * Mapping from child function key to the
+   * parent result object,
+   * offset of the parent result column in original result row,
+   * and the nested offset of the child function result in the parent data block
+   */

Review Comment:
   nit: fix formatting
   Also I think an example here might be useful to more easily understand what offset and nestedOffset really are



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxObject.java:
##########
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+
+
+public class ArgMinMaxObject implements ParentAggregationFunctionResultObject {
+
+  // if the object is created but not yet populated, this happens e.g. when a server has no data for
+  // the query and returns a default value
+  public static final int NOT_NULL_OBJECT = 1;
+  public static final int IS_NULL_OBJECT = 0;
+  // if the object contains non null values
+  private boolean _isNull;
+
+  // if the value is stored in a mutable list, this is true only when the Object is deserialized
+  // from a byte buffer
+  private boolean _mutable;
+
+  // the schema of the measuring columns
+  private final DataSchema _keySchema;

Review Comment:
   from the variable names used in this class, it's hard to know what key and value are here. maybe add a comment somewhere just explaining that measuring columns are keys and the projection columns values, and that the keys are used purely for comparisons. or consider renaming them to be measuring and projecting instead?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxObject.java:
##########
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+
+
+public class ArgMinMaxObject implements ParentAggregationFunctionResultObject {
+
+  // if the object is created but not yet populated, this happens e.g. when a server has no data for
+  // the query and returns a default value
+  public static final int NOT_NULL_OBJECT = 1;
+  public static final int IS_NULL_OBJECT = 0;
+  // if the object contains non null values
+  private boolean _isNull;
+
+  // if the value is stored in a mutable list, this is true only when the Object is deserialized
+  // from a byte buffer
+  private boolean _mutable;
+
+  // the schema of the measuring columns
+  private final DataSchema _keySchema;
+  // the schema of the projection columns
+  private final DataSchema _valSchema;
+
+  // the size of the extremum key cols and value clos
+  private final int _sizeOfExtremumKeys;
+  private final int _sizeOfExtremumVals;
+
+  // the current extremum keys
+  private Comparable[] _extremumKeys = null;
+  // the current extremum values
+  private final List<Object[]> _extremumValues = new ArrayList<>();
+
+  // used for ser/de
+  private DataBlock _immutableKeys;
+  private DataBlock _immutableVals;

Review Comment:
   recommend adding a note on how these interact with `_mutable` so that it is clear when one is used over the other.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ChildAggregationFunction.java:
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.DummyAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.DummyGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public abstract class ChildAggregationFunction implements AggregationFunction<Long, Long> {
+
+  private static final int CHILD_AGGREGATION_FUNCTION_ID_OFFSET = 0;
+  private static final int CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET = 1;
+  private final ExpressionContext _childFunctionKeyInParent;
+  private final List<ExpressionContext> _resultNameOperands;
+  private final ExpressionContext _childFunctionID;
+
+  ChildAggregationFunction(List<ExpressionContext> operands) {
+    _childFunctionID = operands.get(CHILD_AGGREGATION_FUNCTION_ID_OFFSET);
+    _childFunctionKeyInParent = operands.get(CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET);
+    _resultNameOperands = operands.subList(CHILD_AGGREGATION_FUNCTION_COLUMN_KEY_OFFSET + 1, operands.size());
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    ArrayList<ExpressionContext> expressionContexts = new ArrayList<>();
+    expressionContexts.add(_childFunctionID);
+    expressionContexts.add(_childFunctionKeyInParent);
+    expressionContexts.addAll(_resultNameOperands);
+    return expressionContexts;
+  }
+
+  @Override
+  public final AggregationResultHolder createAggregationResultHolder() {
+    return new DummyAggregationResultHolder();
+  }
+
+  @Override
+  public final GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new DummyGroupByResultHolder();
+  }
+
+  @Override
+  public final void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  }
+
+  @Override
+  public final Long extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    return 0L;
+  }
+
+  @Override
+  public final Long extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    return 0L;
+  }
+
+  @Override
+  public final Long merge(Long intermediateResult1, Long intermediateResult2) {
+    return 0L;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.LONG;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.UNKNOWN;
+  }
+
+  @Override
+  public final Long extractFinalResult(Long longValue) {
+    return 0L;
+  }
+
+  /**
+   * The name of the column as follows:
+   * CHILD_AGGREGATION_NAME_PREFIX + actual function type + operands + CHILD_AGGREGATION_SEPERATOR
+   * + actual function type + parent aggregation function id + CHILD_KEY_SEPERATOR + column key in parent function
+   */
+  @Override
+  public final String getResultColumnName() {
+    String type = getType().getName().toLowerCase();
+    return CommonConstants.RewriterConstants.CHILD_AGGREGATION_NAME_PREFIX
+        // above is the prefix for all child aggregation functions
+
+        + type + "(" + _resultNameOperands.stream().map(ExpressionContext::toString)
+        .collect(Collectors.joining(",")) + ")"
+        // above is the actual child aggregation function name we want to return to the user
+
+        + CommonConstants.RewriterConstants.CHILD_AGGREGATION_SEPERATOR
+        + type
+        + _childFunctionID.getLiteral().getStringValue()
+        + CommonConstants.RewriterConstants.CHILD_KEY_SEPERATOR
+        + _childFunctionKeyInParent.toString();
+    // above is the column key in the parent aggregation function
+  }
+
+  @Override
+  public String toExplainString() {

Review Comment:
   Recommend adding a test or two to the `ExplainPlanQueriesTest` file to test out the EXPLAIN PLAN output



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ParentAggregationFunction.java:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public abstract class ParentAggregationFunction<I, F extends ParentAggregationFunctionResultObject>
+    implements AggregationFunction<I, F> {
+
+  protected static final int PARENT_AGGREGATION_FUNCTION_ID_OFFSET = 0;
+  protected List<ExpressionContext> _arguments;
+
+  ParentAggregationFunction(List<ExpressionContext> arguments) {
+    _arguments = arguments;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.OBJECT;
+  }
+
+  // The name of the column is the prefix of the parent aggregation function + the name of the
+  // aggregation function + the id of the parent aggregation function
+  @Override
+  public String getResultColumnName() {
+    return CommonConstants.RewriterConstants.PARENT_AGGREGATION_NAME_PREFIX
+        + getType().getName().toLowerCase()
+        + _arguments.get(PARENT_AGGREGATION_FUNCTION_ID_OFFSET).getLiteral().getIntValue();
+  }
+
+  public String toExplainString() {

Review Comment:
   Recommend adding a test or two to the ExplainPlanQueriesTest file to test out the EXPLAIN PLAN output



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ParentArgMinMaxAggregationFunction.java:
##########
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxObject;
+import org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxWrapperValSet;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class ParentArgMinMaxAggregationFunction extends ParentAggregationFunction<ArgMinMaxObject, ArgMinMaxObject> {
+
+  // list of columns that we do min/max on
+  private final List<ExpressionContext> _measuringColumns;
+  // list of columns that we project based on the min/max value
+  private final List<ExpressionContext> _projectionColumns;
+  // true if we are doing argmax, false if we are doing argmin
+  private final boolean _isMax;
+  // the id of the function, this is to associate the result of the parent aggregation function with the
+  // child aggregation functions having the same type(argmin/argmax) and measuring columns
+  private final ExpressionContext _functionIdContext;
+  private final ExpressionContext _numMeasuringColumnContext;
+  // number of columns that we do min/max on
+  private final int _numMeasuringColumns;
+  // number of columns that we project based on the min/max value
+  private final int _numProjectionColumns;
+
+  // The following variable need to be initialized
+
+  // The wrapper classes for the block value sets
+  private final ThreadLocal<List<ArgMinMaxWrapperValSet>> _argMinMaxWrapperMeasuringColumnSets =
+      ThreadLocal.withInitial(ArrayList::new);
+  private final ThreadLocal<List<ArgMinMaxWrapperValSet>> _argMinMaxWrapperProjectionColumnSets =
+      ThreadLocal.withInitial(ArrayList::new);
+  // The schema for the measuring columns and projection columns
+  private final ThreadLocal<DataSchema> _measuringColumnSchema = new ThreadLocal<>();
+  private final ThreadLocal<DataSchema> _projectionColumnSchema = new ThreadLocal<>();
+  // If the schemas are initialized
+  private final ThreadLocal<Boolean> _schemaInitiated = ThreadLocal.withInitial(() -> false);
+
+  public ParentArgMinMaxAggregationFunction(List<ExpressionContext> arguments, boolean isMax) {
+
+    super(arguments);
+    _isMax = isMax;
+    _functionIdContext = arguments.get(0);
+
+    _numMeasuringColumnContext = arguments.get(1);
+    _numMeasuringColumns = _numMeasuringColumnContext.getLiteral().getIntValue();
+
+    _measuringColumns = arguments.subList(2, 2 + _numMeasuringColumns);
+    _projectionColumns = arguments.subList(2 + _numMeasuringColumns, arguments.size());
+    _numProjectionColumns = _projectionColumns.size();
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return _isMax ? AggregationFunctionType.ARGMAX : AggregationFunctionType.ARGMIN;
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    ArrayList<ExpressionContext> expressionContexts = new ArrayList<>();
+    expressionContexts.add(_functionIdContext);
+    expressionContexts.add(_numMeasuringColumnContext);
+    expressionContexts.addAll(_measuringColumns);
+    expressionContexts.addAll(_projectionColumns);
+    return expressionContexts;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @SuppressWarnings("LoopStatementThatDoesntLoop")
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    ArgMinMaxObject argMinMaxObject = aggregationResultHolder.getResult();
+
+    if (argMinMaxObject == null) {
+      initializeWithNewDataBlocks(blockValSetMap);
+      argMinMaxObject = new ArgMinMaxObject(_measuringColumnSchema.get(), _projectionColumnSchema.get());
+    }
+
+    List<Integer> rowIds = new ArrayList<>();
+    for (int i = 0; i < length; i++) {
+      int compareResult = argMinMaxObject.compareAndSetKey(_argMinMaxWrapperMeasuringColumnSets.get(), i, _isMax);
+      if (compareResult == 0) {
+        // same key, add the rowId to the list
+        rowIds.add(i);
+      } else if (compareResult > 0) {
+        // new key is set, clear the list and add the new rowId
+        rowIds.clear();
+        rowIds.add(i);
+      }
+    }
+
+    // for all the rows that are associated with the extremum key, add the projection columns
+    for (Integer rowId : rowIds) {
+      argMinMaxObject.addVal(_argMinMaxWrapperProjectionColumnSets.get(), rowId);
+    }
+
+    aggregationResultHolder.setValue(argMinMaxObject);
+  }
+
+  // this method is called to initialize the schemas if they are not initialized
+  // and to set the new block value sets for the wrapper classes
+  private void initializeWithNewDataBlocks(Map<ExpressionContext, BlockValSet> blockValSetMap) {

Review Comment:
   this function is quite large and hard to read. can you modularize it a bit if possible?



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/argminmax/ArgMinMaxWrapperValSet.java:
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.utils.argminmax;
+
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+
+
+public class ArgMinMaxWrapperValSet {

Review Comment:
   is my understanding correct that the comparator functions are only used for measuring columns and the value related functions only used for projection columns? Just wondering if it'll be cleaner to have a base class and extend it to add the comparator functions for measuring columns and the value ones for the projection columns. Just to make the responsibility very clear on how these APIs are used. Not a must do, but for someone just looking at this class without looking at the usage the APIs are a bit confusing
   
   e.g. getComparable and getValue return the same underlying arrays



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/ParentAggregationFunction.java:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.utils.ParentAggregationFunctionResultObject;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public abstract class ParentAggregationFunction<I, F extends ParentAggregationFunctionResultObject>
+    implements AggregationFunction<I, F> {
+
+  protected static final int PARENT_AGGREGATION_FUNCTION_ID_OFFSET = 0;
+  protected List<ExpressionContext> _arguments;
+
+  ParentAggregationFunction(List<ExpressionContext> arguments) {
+    _arguments = arguments;
+  }
+
+  @Override
+  public final DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.OBJECT;
+  }
+
+  // The name of the column is the prefix of the parent aggregation function + the name of the
+  // aggregation function + the id of the parent aggregation function

Review Comment:
   an example will be useful here



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DummyAggregationResultHolder.java:
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.groupby;
+
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+
+
+/**
+ * Placeholder AggregationResultHolder that does noop

Review Comment:
   maybe add a comment on where this placeholder is used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org