You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/06/25 16:56:27 UTC

[incubator-pinot] branch master updated: Support json path expressions in query. (#6998)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c856c6c  Support json path expressions in query. (#6998)
c856c6c is described below

commit c856c6ca1938d52ec3e287e112695a1425032e51
Author: Amrish Lal <am...@gmail.com>
AuthorDate: Fri Jun 25 09:56:09 2021 -0700

    Support json path expressions in query. (#6998)
    
    * Support json path expressions in query.
    
    * Cleanup.
    
    * Support aliases for function expressions with json path.
    
    * Cleanup.
    
    * Don't support json path expressions in HAVING clause.
    
    * Cleanup.
    
    * Support json path expressions in HAVING clause
    
    * Support arrays in json path expressions.
    
    * Cleanup.
    
    * Cleanup.
    
    * Cleanup.
    
    * Codereview changes.
    
    * Cleanup.
    
    * Add SQL test cases with JSON path expressions.
    
    * Add SQL test cases with JSON path expressions.
    
    * Support non-indexed JSON columns in json path expressions.
    
    * Support non-indexed JSON columns in json path expressions.
    
    * Codereview changes.
    
    * Codereview changes.
    
    * Codereview changes.
    
    * Codereview changes.
    
    * Rebuild.
    
    * Codereview changes.
    
    * Codereview changes.
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  10 +-
 .../apache/pinot/sql/parsers/CalciteSqlParser.java |  58 ++
 .../pinot/core/query/optimizer/QueryOptimizer.java |  20 +-
 .../statement/JsonStatementOptimizer.java          | 587 +++++++++++++++++++++
 .../optimizer/statement/StatementOptimizer.java    |  35 ++
 .../statement/JsonStatementOptimizerTest.java      | 204 +++++++
 .../org/apache/pinot/queries/BaseQueriesTest.java  |  39 ++
 .../apache/pinot/queries/JsonPathQueriesTest.java  | 302 +++++++++++
 .../java/org/apache/pinot/spi/data/Schema.java     |  14 +-
 9 files changed, 1261 insertions(+), 8 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index e6c8a17..d692619 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -345,23 +345,25 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     if (offlineTableName != null && realtimeTableName != null) {
       // Hybrid
       offlineBrokerRequest = getOfflineBrokerRequest(brokerRequest);
-      _queryOptimizer.optimize(offlineBrokerRequest.getPinotQuery(), schema);
+      _queryOptimizer
+          .optimize(offlineBrokerRequest.getPinotQuery(), _tableCache.getTableConfig(offlineTableName), schema);
       realtimeBrokerRequest = getRealtimeBrokerRequest(brokerRequest);
-      _queryOptimizer.optimize(realtimeBrokerRequest.getPinotQuery(), schema);
+      _queryOptimizer
+          .optimize(realtimeBrokerRequest.getPinotQuery(), _tableCache.getTableConfig(realtimeTableName), schema);
       requestStatistics.setFanoutType(RequestStatistics.FanoutType.HYBRID);
       requestStatistics.setOfflineServerTenant(getServerTenant(offlineTableName));
       requestStatistics.setRealtimeServerTenant(getServerTenant(realtimeTableName));
     } else if (offlineTableName != null) {
       // OFFLINE only
       setTableName(brokerRequest, offlineTableName);
-      _queryOptimizer.optimize(pinotQuery, schema);
+      _queryOptimizer.optimize(pinotQuery, _tableCache.getTableConfig(offlineTableName), schema);
       offlineBrokerRequest = brokerRequest;
       requestStatistics.setFanoutType(RequestStatistics.FanoutType.OFFLINE);
       requestStatistics.setOfflineServerTenant(getServerTenant(offlineTableName));
     } else {
       // REALTIME only
       setTableName(brokerRequest, realtimeTableName);
-      _queryOptimizer.optimize(pinotQuery, schema);
+      _queryOptimizer.optimize(pinotQuery, _tableCache.getTableConfig(realtimeTableName), schema);
       realtimeBrokerRequest = brokerRequest;
       requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME);
       requestStatistics.setRealtimeServerTenant(getServerTenant(realtimeTableName));
diff --git a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
index b72db4b..d6158d8 100644
--- a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
+++ b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
@@ -927,7 +927,15 @@ public class CalciteSqlParser {
         break;
       case OTHER:
       case OTHER_FUNCTION:
+      case DOT:
         functionName = functionNode.getOperator().getName().toUpperCase();
+        if (functionName.equals("ITEM") || functionName.equals("DOT")) {
+          // Calcite parses path expression such as "data[0][1].a.b[0]" into a chain of ITEM and/or DOT
+          // functions. Collapse this chain into an identifier.
+          StringBuffer path = new StringBuffer();
+          compilePathExpression(functionName, functionNode, path);
+          return RequestUtils.getIdentifierExpression(path.toString());
+        }
         break;
       default:
         functionName = functionKind.name();
@@ -950,6 +958,56 @@ public class CalciteSqlParser {
     return functionExpression;
   }
 
+  /**
+   * Convert Calcite operator tree made up of ITEM and DOT functions to an identifier. For example, the operator tree
+   * shown below will be converted to IDENTIFIER "jsoncolumn.data[0][1].a.b[0]".
+   *
+   * ├── ITEM(jsoncolumn.data[0][1].a.b[0])
+   *      ├── LITERAL (0)
+   *      └── DOT (jsoncolumn.daa[0][1].a.b)
+   *            ├── IDENTIFIER (b)
+   *            └── DOT (jsoncolumn.data[0][1].a)
+   *                  ├── IDENTIFIER (a)
+   *                  └── ITEM (jsoncolumn.data[0][1])
+   *                        ├── LITERAL (1)
+   *                        └── ITEM (jsoncolumn.data[0])
+   *                              ├── LITERAL (1)
+   *                              └── IDENTIFIER (jsoncolumn.data)
+   *
+   * @param functionName Name of the function ("DOT" or "ITEM")
+   * @param functionNode Root node of the DOT and/or ITEM operator function chain.
+   * @param path String representation of path represented by DOT and/or ITEM function chain.
+   */
+  private static void compilePathExpression(String functionName, SqlBasicCall functionNode, StringBuffer path) {
+    SqlNode[] operands = functionNode.getOperands();
+
+    // Compile first operand of the function (either an identifier or another DOT and/or ITEM function).
+    SqlKind kind0 = operands[0].getKind();
+    if (kind0 == SqlKind.IDENTIFIER) {
+      path.append(((SqlIdentifier) operands[0]).toString());
+    } else if (kind0 == SqlKind.DOT || kind0 == SqlKind.OTHER_FUNCTION) {
+      SqlBasicCall function0 = (SqlBasicCall) operands[0];
+      String name0 = function0.getOperator().getName();
+      if (name0.equals("ITEM") || name0.equals("DOT")) {
+        compilePathExpression(name0, function0, path);
+      } else {
+        throw new SqlCompilationException("SELECT list item has bad path expression.");
+      }
+    } else {
+      throw new SqlCompilationException("SELECT list item has bad path expression.");
+    }
+
+    // Compile second operand of the function (either an identifier or literal).
+    SqlKind kind1 = operands[1].getKind();
+    if (kind1 == SqlKind.IDENTIFIER) {
+      path.append(".").append(((SqlIdentifier) operands[1]).getSimple());
+    } else if (kind1 == SqlKind.LITERAL) {
+      path.append("[").append(((SqlLiteral) operands[1]).toValue()).append("]");
+    } else {
+      throw new SqlCompilationException("SELECT list item has bad path expression.");
+    }
+  }
+
   private static void validateFunction(String functionName, List<Expression> operands) {
     switch (canonicalize(functionName)) {
       case "jsonextractscalar":
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java
index 4cba605..f77f918d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java
@@ -33,6 +33,9 @@ import org.apache.pinot.core.query.optimizer.filter.MergeEqInFilterOptimizer;
 import org.apache.pinot.core.query.optimizer.filter.MergeRangeFilterOptimizer;
 import org.apache.pinot.core.query.optimizer.filter.NumericalFilterOptimizer;
 import org.apache.pinot.core.query.optimizer.filter.TimePredicateFilterOptimizer;
+import org.apache.pinot.core.query.optimizer.statement.JsonStatementOptimizer;
+import org.apache.pinot.core.query.optimizer.statement.StatementOptimizer;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -46,6 +49,9 @@ public class QueryOptimizer {
       .asList(new FlattenAndOrFilterOptimizer(), new MergeEqInFilterOptimizer(), new NumericalFilterOptimizer(),
           new TimePredicateFilterOptimizer(), new MergeRangeFilterOptimizer());
 
+  private static final List<StatementOptimizer> STATEMENT_OPTIMIZERS = Arrays
+      .asList(new JsonStatementOptimizer());
+
   /**
    * Optimizes the given PQL query.
    */
@@ -61,10 +67,13 @@ public class QueryOptimizer {
     }
   }
 
-  /**
-   * Optimizes the given SQL query.
-   */
+  /** Optimizes the given SQL query. */
   public void optimize(PinotQuery pinotQuery, @Nullable Schema schema) {
+    optimize(pinotQuery, null, schema);
+  }
+
+  /** Optimizes the given SQL query. */
+  public void optimize(PinotQuery pinotQuery, @Nullable TableConfig tableConfig, @Nullable Schema schema) {
     Expression filterExpression = pinotQuery.getFilterExpression();
     if (filterExpression != null) {
       for (FilterOptimizer filterOptimizer : FILTER_OPTIMIZERS) {
@@ -72,5 +81,10 @@ public class QueryOptimizer {
       }
       pinotQuery.setFilterExpression(filterExpression);
     }
+
+    // Run statement optimizer after filter has already been optimized.
+    for (StatementOptimizer statementOptimizer : STATEMENT_OPTIMIZERS) {
+      statementOptimizer.optimize(pinotQuery, tableConfig, schema);
+    }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/statement/JsonStatementOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/statement/JsonStatementOptimizer.java
new file mode 100644
index 0000000..31b5b09
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/statement/JsonStatementOptimizer.java
@@ -0,0 +1,587 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.statement;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.common.function.scalar.ArithmeticFunctions;
+import org.apache.pinot.common.function.scalar.DateTimeFunctions;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Identifier;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
+import org.apache.pinot.pql.parsers.pql2.ast.FloatingPointLiteralAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.IntegerLiteralAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.LiteralAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.StringLiteralAstNode;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.Pair;
+
+
+/**
+ * This class will rewrite a query that has json path expressions into a query that uses JSON_EXTRACT_SCALAR and
+ * JSON_MATCH functions.
+ *
+ * Example 1:
+ *   From : SELECT jsonColumn.name.first
+ *             FROM testTable
+ *            WHERE jsonColumn.name.first IS NOT NULL
+ *   TO   : SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.name.first', 'STRING', 'null') AS jsonColum.name.first
+ *             FROM testTable
+ *            WHERE JSON_MATCH('"$.name.first" IS NOT NULL')
+ *
+ * Output datatype of any json path expression is 'STRING'. However, if json path expression appears as an argument to
+ * a numerical function, then output of json path expression is set to 'DOUBLE' as shown in the example below.
+ *
+ * Example 2:
+ *   From:   SELECT MIN(jsonColumn.id - 5)
+ *             FROM testTable
+ *            WHERE jsonColumn.id IS NOT NULL
+ *   To:     SELECT MIN(MINUS(JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'DOUBLE', Double.NEGATIVE_INFINITY),5)) AS min(minus(jsonColum.id, '5'))
+ *             FROM testTable
+ *            WHERE JSON_MATCH('"$.id" IS NOT NULL')
+ *
+ * Example 3:
+ *   From:  SELECT jsonColumn.id, count(*)
+ *             FROM testTable
+ *            WHERE jsonColumn.name.first = 'Daffy' OR jsonColumn.id = 101
+ *         GROUP BY jsonColumn.id
+ *   To:    SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'STRING', 'null') AS jsonColumn.id, count(*)
+ *             FROM testTable
+ *            WHERE JSON_MATCH('"$.name.first" = ''Daffy''') OR JSON_MATCH('"$.id" = 101')
+ *         GROUP BY JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'STRING', 'null');
+ *
+ * Example 4:
+ *   From: SELECT jsonColumn.name.last, count(*)
+ *            FROM testTable
+ *        GROUP BY jsonColumn.name.last
+ *          HAVING jsonColumn.name.last = 'mouse'
+ *     To: SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null') AS jsonColumn.name.last, count(*)
+ *               FROM testTable
+ *           GROUP BY JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null')
+ *             HAVING JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null') = 'mouse'
+ *
+ * Notes:
+ * 1) In a filter expression, if json path appears on the left-hand side, the right-hand side must be a literal. In
+ *    future this can be changed to have any expression on the right-hand side by implementing a function that would
+ *    convert any {@link Expression} into SQL fragment that can be used in JSON_MATCH. Currently only literals are
+ *    converted into SQL fragments {see @link #getLiteralSQL} function.
+ * 2) In WHERE clause each json path expression will be replaced with a JSON_MATCH function. If there are multiple
+ *    json path expressions, they will be replaced by multiple JSON_MATCH functions. We currently don't fold multiple
+ *    JSON_MATCH functions into a single JSON_MATCH_FUNCTION.
+ */
+public class JsonStatementOptimizer implements StatementOptimizer {
+
+  /**
+   * Maintain a list of numerical functions that requiring json path expression to output numerical values. This allows
+   * us to implicitly convert the output of json path expression to DOUBLE. TODO: There are better ways of doing this
+   * if we were to move to a new storage (currently STRING) for JSON or functions were to pre-declare their input
+   * data types.
+   */
+  private static Set<String> numericalFunctions = getNumericalFunctionList();
+
+  /**
+   * A list of functions that require json path expression to output LONG value. This allows us to implicitly convert
+   * the output of json path expression to LONG.
+   */
+  private static Set<String> datetimeFunctions = getDateTimeFunctionList();
+
+  /**
+   * Null value constants for different column types. Used while rewriting json path expression to JSON_EXTRACT_SCALAR function.
+   */
+  private static LiteralAstNode DEFAULT_DIMENSION_NULL_VALUE_OF_INT_AST =
+      new IntegerLiteralAstNode(FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT);
+  private static LiteralAstNode DEFAULT_DIMENSION_NULL_VALUE_OF_LONG_AST =
+      new IntegerLiteralAstNode(FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG);
+  private static LiteralAstNode DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT_AST =
+      new FloatingPointLiteralAstNode(FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT);
+  private static LiteralAstNode DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE_AST =
+      new FloatingPointLiteralAstNode(FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE);
+  private static LiteralAstNode DEFAULT_DIMENSION_NULL_VALUE_OF_STRING_AST =
+      new StringLiteralAstNode(FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING);
+
+  @Override
+  public void optimize(PinotQuery query, @Nullable TableConfig tableConfig, @Nullable Schema schema) {
+    // If schema doesn't have any JSON columns, there is no need to run this optimizer.
+    if (schema == null || !schema.hasJSONColumn()) {
+      return;
+    }
+
+    // In SELECT clause, replace JSON path expressions with JSON_EXTRACT_SCALAR function with an alias.
+    List<Expression> expressions = query.getSelectList();
+    for (Expression expression : expressions) {
+      Pair<String, Boolean> result = optimizeJsonIdentifier(expression, schema, DataSchema.ColumnDataType.STRING);
+      if (expression.getType() == ExpressionType.FUNCTION && !expression.getFunctionCall().getOperator().equals("AS")
+          && result.getSecond()) {
+        // Since this is not an AS function (user-specified alias) and the function or its arguments contain json path
+        // expression, set an alias for the expression after replacing json path expression with JSON_EXTRACT_SCALAR
+        // function.
+        Function aliasFunction = getAliasFunction(result.getFirst(), expression.getFunctionCall());
+        expression.setFunctionCall(aliasFunction);
+      }
+    }
+
+    // In WHERE clause, replace JSON path expressions with JSON_MATCH function.
+    Expression filter = query.getFilterExpression();
+    if (filter != null) {
+      optimizeJsonPredicate(filter, tableConfig, schema);
+    }
+
+    // In GROUP BY clause, replace JSON path expressions with JSON_EXTRACT_SCALAR function without an alias.
+    expressions = query.getGroupByList();
+    if (expressions != null) {
+      for (Expression expression : expressions) {
+        optimizeJsonIdentifier(expression, schema, DataSchema.ColumnDataType.STRING);
+      }
+    }
+
+    // In ORDER BY clause, replace JSON path expression with JSON_EXTRACT_SCALAR. This expression must match the
+    // corresponding SELECT list expression except for the alias.
+    expressions = query.getOrderByList();
+    if (expressions != null) {
+      for (Expression expression : expressions) {
+        optimizeJsonIdentifier(expression, schema, DataSchema.ColumnDataType.STRING);
+      }
+    }
+
+    // In HAVING clause, replace JSON path expressions with JSON_EXTRACT_SCALAR. This expression must match the
+    // corresponding SELECT list expression except for the alias.
+    Expression expression = query.getHavingExpression();
+    if (expression != null) {
+      optimizeJsonIdentifier(expression, schema, DataSchema.ColumnDataType.STRING);
+    }
+  }
+
+  /**
+   * Replace an json path expression with an aliased JSON_EXTRACT_SCALAR function.
+   * @param expression input expression to rewrite into JSON_EXTRACT_SCALAR function if the expression is json path.
+   * @param outputDataType to keep track of output datatype of JSON_EXTRACT_SCALAR function which depends upon the outer
+   *                 function that json path expression appears in.
+   * @return A {@link Pair} of values where the first value is alias for the input expression and second
+   * value indicates whether json path expression was found (true) or not (false) in the expression.
+   */
+  private static Pair<String, Boolean> optimizeJsonIdentifier(Expression expression, @Nullable Schema schema,
+      DataSchema.ColumnDataType outputDataType) {
+    switch (expression.getType()) {
+      case LITERAL:
+        return new Pair<>(getLiteralSQL(expression.getLiteral(), true), false);
+      case IDENTIFIER: {
+        boolean hasJsonPathExpression = false;
+        String columnName = expression.getIdentifier().getName();
+        if (!schema.hasColumn(columnName)) {
+          String[] parts = getIdentifierParts(expression.getIdentifier());
+          if (parts.length > 1 && isValidJSONColumn(parts[0], schema)) {
+            // replace <column-name>.<json-path> with json_extract_scalar(<column-name>, '<json-path>', 'STRING', <JSON-null-value>)
+            Function jsonExtractScalarFunction = getJsonExtractFunction(parts, outputDataType);
+            expression.setIdentifier(null);
+            expression.setType(ExpressionType.FUNCTION);
+            expression.setFunctionCall(jsonExtractScalarFunction);
+            hasJsonPathExpression = true;
+          }
+        }
+        return new Pair<>(columnName, hasJsonPathExpression);
+      }
+      case FUNCTION: {
+        Function function = expression.getFunctionCall();
+        List<Expression> operands = function.getOperands();
+
+        boolean hasJsonPathExpression = false;
+        StringBuffer alias = new StringBuffer();
+        if (function.getOperator().toUpperCase().equals("AS")) {
+          // We don't need to compute an alias for AS function since AS function defines its own alias.
+          hasJsonPathExpression = optimizeJsonIdentifier(operands.get(0), schema, outputDataType).getSecond();
+          alias.append(function.getOperands().get(1).getIdentifier().getName());
+        } else {
+          // For all functions besides AS function, process the operands and compute the alias.
+          alias.append(function.getOperator().toLowerCase()).append("(");
+
+          // Output datatype of JSON_EXTRACT_SCALAR will depend upon the function within which json path expression appears.
+          outputDataType = getJsonExtractOutputDataType(function);
+
+          for (int i = 0; i < operands.size(); ++i) {
+            // recursively check to see if there is a <json-column>.<json-path> identifier in this expression.
+            Pair<String, Boolean> operandResult = optimizeJsonIdentifier(operands.get(i), schema, outputDataType);
+            hasJsonPathExpression |= operandResult.getSecond();
+            if (i > 0) {
+              alias.append(",");
+            }
+            alias.append(operandResult.getFirst());
+          }
+          alias.append(")");
+        }
+
+        return new Pair<>(alias.toString(), hasJsonPathExpression);
+      }
+    }
+
+    return new Pair<>("", false);
+  }
+
+  /**
+   * Example:
+   *   Input:
+   *     alias   : "jsoncolumn.x.y.z",
+   *     function: JSON_EXTRACT_SCALAR('jsoncolumn', 'x.y.z', 'STRING', 'null')
+   *   Output: AS(JSON_EXTRACT_SCALAR('jsoncolumn', 'x.y.z', 'STRING', 'null'), 'jsoncolumn.x.y.z')
+   *
+   * @return a Function with "AS" operator that wraps another function.
+   */
+  private static Function getAliasFunction(String alias, Function function) {
+    Function aliasFunction = new Function("AS");
+
+    List<Expression> operands = new ArrayList<>();
+    Expression expression = new Expression(ExpressionType.FUNCTION);
+    expression.setFunctionCall(function);
+    operands.add(expression);
+    operands.add(RequestUtils.createIdentifierExpression(alias));
+    aliasFunction.setOperands(operands);
+
+    return aliasFunction;
+  }
+
+  /**
+   * Example:
+   * Input : ["jsoncolumn", "x","y","z[2]"]
+   * Output: JSON_EXTRACT_SCALAR('jsoncolumn','$.x.y.z[2]','STRING','null')
+   *
+   * @param parts All the subparts of a fully qualified identifier (json path expression).
+   * @param dataType Output datatype of JSON_EXTRACT_SCALAR function.
+   * @return a Function with JSON_EXTRACT_SCALAR operator created using parts of fully qualified identifier name.
+   */
+  private static Function getJsonExtractFunction(String[] parts, DataSchema.ColumnDataType dataType) {
+    Function jsonExtractScalarFunction = new Function("JSON_EXTRACT_SCALAR");
+    List<Expression> operands = new ArrayList<>();
+    operands.add(RequestUtils.createIdentifierExpression(parts[0]));
+    operands.add(RequestUtils.createLiteralExpression(new StringLiteralAstNode(getJsonPath(parts, false))));
+    operands.add(RequestUtils.createLiteralExpression(new StringLiteralAstNode(dataType.toString())));
+
+    operands.add(RequestUtils.createLiteralExpression(getDefaultNullValueForType(dataType)));
+    jsonExtractScalarFunction.setOperands(operands);
+    return jsonExtractScalarFunction;
+  }
+
+  /**
+   * Example 1:
+   * Input : "jsonColumn.name.first = 'daffy'"
+   * Output: "JSON_MATCH(jsonColumn, '\"$.name.first\" = ''daffy''').
+   *
+   * Example 2:
+   * Input : "jsonColumn.id = 4"
+   * Output: "JSON_MATCH(jsonColumn, '\"$.id\" = 4')
+   */
+  private static void optimizeJsonPredicate(Expression expression, @Nullable TableConfig tableConfig,
+      @Nullable Schema schema) {
+    if (expression.getType() == ExpressionType.FUNCTION) {
+      Function function = expression.getFunctionCall();
+      String operator = function.getOperator();
+      FilterKind kind = FilterKind.valueOf(operator);
+      List<Expression> operands = function.getOperands();
+      switch (kind) {
+        case AND:
+        case OR: {
+          operands.forEach(operand -> optimizeJsonPredicate(operand, tableConfig, schema));
+          break;
+        }
+        case EQUALS:
+        case NOT_EQUALS:
+        case GREATER_THAN:
+        case GREATER_THAN_OR_EQUAL:
+        case LESS_THAN:
+        case LESS_THAN_OR_EQUAL: {
+          Expression left = operands.get(0);
+          Expression right = operands.get(1);
+          if (left.getType() == ExpressionType.IDENTIFIER && right.getType() == ExpressionType.LITERAL) {
+            if (!schema.hasColumn(left.getIdentifier().getName())) {
+              String[] parts = getIdentifierParts(left.getIdentifier());
+              if (parts.length > 1 && isValidJSONColumn(parts[0], schema)) {
+                if (isIndexedJSONColumn(parts[0], tableConfig)) {
+                  Function jsonMatchFunction = new Function("JSON_MATCH");
+
+                  List<Expression> jsonMatchFunctionOperands = new ArrayList<>();
+                  jsonMatchFunctionOperands.add(RequestUtils.createIdentifierExpression(parts[0]));
+                  jsonMatchFunctionOperands.add(RequestUtils.createLiteralExpression(new StringLiteralAstNode(
+                      getJsonPath(parts, true) + getOperatorSQL(kind) + getLiteralSQL(right.getLiteral(), false))));
+                  jsonMatchFunction.setOperands(jsonMatchFunctionOperands);
+
+                  expression.setFunctionCall(jsonMatchFunction);
+                } else {
+                  left.clear();
+                  left.setType(ExpressionType.FUNCTION);
+                  left.setFunctionCall(getJsonExtractFunction(parts, getColumnTypeForLiteral(right.getLiteral())));
+                }
+              }
+            }
+          }
+          break;
+        }
+        case IS_NULL:
+        case IS_NOT_NULL: {
+          Expression operand = operands.get(0);
+          if (operand.getType() == ExpressionType.IDENTIFIER) {
+            if (!schema.hasColumn(operand.getIdentifier().getName())) {
+              String[] parts = getIdentifierParts(operand.getIdentifier());
+              if (parts.length > 1 && isValidJSONColumn(parts[0], schema)) {
+                if (isIndexedJSONColumn(parts[0], tableConfig)) {
+                  Function jsonMatchFunction = new Function("JSON_MATCH");
+
+                  List<Expression> jsonMatchFunctionOperands = new ArrayList<>();
+                  jsonMatchFunctionOperands.add(RequestUtils.createIdentifierExpression(parts[0]));
+                  jsonMatchFunctionOperands.add(RequestUtils.createLiteralExpression(
+                      new StringLiteralAstNode(getJsonPath(parts, true) + getOperatorSQL(kind))));
+                  jsonMatchFunction.setOperands(jsonMatchFunctionOperands);
+
+                  expression.setFunctionCall(jsonMatchFunction);
+                } else {
+                  operand.clear();
+                  operand.setType(ExpressionType.FUNCTION);
+                  operand.setFunctionCall(getJsonExtractFunction(parts, DataSchema.ColumnDataType.JSON));
+                }
+              }
+            }
+          }
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   *  @return A string array containing all the parts of an identifier. An identifier may have one or more parts that
+   *  are joined together using <DOT>. For example the identifier "testTable.jsonColumn.name.first" consists up of
+   *  "testTable" (name of table), "jsonColumn" (name of column), "name" (json path), and "first" (json path). The last
+   *  two parts when joined together (name.first) represent a JSON path expression.
+   */
+  private static String[] getIdentifierParts(Identifier identifier) {
+    return StringUtils.split(identifier.getName(), '.');
+  }
+
+  /**
+   * Builds a json path expression when given identifier parts. For example,given [jsonColumn, name, first], this
+   * function will return "$.name.first" as json path expression.
+   * @param parts identifier parts
+   * @param applyDoubleQuote delimit json path with double quotes if true; otherwise, don't delimit json path.
+   * @return JSON path expression associated with the given identifier parts.
+   */
+  private static String getJsonPath(String[] parts, boolean applyDoubleQuote) {
+    StringBuilder builder = new StringBuilder();
+    if (applyDoubleQuote) {
+      builder.append("\"");
+    }
+
+    builder.append("$");
+    for (int i = 1; i < parts.length; i++) {
+      builder.append(".").append(parts[i]);
+    }
+
+    if (applyDoubleQuote) {
+      builder.append("\"");
+    }
+
+    return builder.toString();
+  }
+
+  /** @return true if specified column has column datatype of JSON; otherwise, return false */
+  private static boolean isValidJSONColumn(String columnName, @Nullable Schema schema) {
+    return schema != null && schema.hasColumn(columnName) && schema.getFieldSpecFor(columnName).getDataType()
+        .equals(FieldSpec.DataType.JSON);
+  }
+
+  /** @return true if specified column has a JSON Index. */
+  private static boolean isIndexedJSONColumn(String columnName, @Nullable TableConfig tableConfig) {
+    if (tableConfig == null) {
+      return false;
+    }
+
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    if (indexingConfig == null) {
+      return false;
+    }
+
+    List<String> jsonIndexColumns = indexingConfig.getJsonIndexColumns();
+    if (jsonIndexColumns == null) {
+      return false;
+    }
+
+    return jsonIndexColumns.contains(columnName);
+  }
+
+  /** @return symbolic representation of function operator delimited by spaces. */
+  private static String getOperatorSQL(FilterKind kind) {
+    switch (kind) {
+      case EQUALS:
+        return " = ";
+      case NOT_EQUALS:
+        return " != ";
+      case GREATER_THAN:
+        return " > ";
+      case GREATER_THAN_OR_EQUAL:
+        return " >= ";
+      case LESS_THAN:
+        return " < ";
+      case LESS_THAN_OR_EQUAL:
+        return " <= ";
+      case IN:
+        return " IN ";
+      case NOT_IN:
+        return " NOT IN ";
+      case IS_NULL:
+        return " IS NULL";
+      case IS_NOT_NULL:
+        return " IS NOT NULL";
+    }
+    return " ";
+  }
+
+  /**
+   * @param literal {@link Literal} to convert to a {@link String}.
+   * @param aliasing When true, generate string for use in an alias; otherwise, generate SQL string representation.
+   * @return Literal value converted into either an alias name or an SQL string. BYTE, STRING, and BINARY values are
+   * delimited by quotes in SQL and everything is delimited by quotes for use in alias.
+   * */
+  private static String getLiteralSQL(Literal literal, boolean aliasing) {
+    StringBuffer result = new StringBuffer();
+    result.append(aliasing ? "'" : "");
+    switch (literal.getSetField()) {
+      case BOOL_VALUE:
+        result.append(String.valueOf(literal.getBinaryValue()));
+        break;
+      case BYTE_VALUE:
+        result.append(
+            aliasing ? String.valueOf(literal.getByteValue()) : "'" + String.valueOf(literal.getByteValue()) + "'");
+        break;
+      case SHORT_VALUE:
+        result.append(
+            aliasing ? String.valueOf(literal.getShortValue()) : "'" + String.valueOf(literal.getShortValue()) + "'");
+        break;
+      case INT_VALUE:
+        result.append(String.valueOf(literal.getIntValue()));
+        break;
+      case LONG_VALUE:
+        result.append(String.valueOf(literal.getLongValue()));
+        break;
+      case DOUBLE_VALUE:
+        result.append(String.valueOf(literal.getDoubleValue()));
+        break;
+      case STRING_VALUE:
+        result.append("'" + literal.getStringValue() + "'");
+        break;
+      case BINARY_VALUE:
+        result.append(
+            aliasing ? String.valueOf(literal.getBinaryValue()) : "'" + String.valueOf(literal.getBinaryValue()) + "'");
+        break;
+    }
+
+    result.append(aliasing ? "'" : "");
+    return result.toString();
+  }
+
+  private static DataSchema.ColumnDataType getColumnTypeForLiteral(Literal literal) {
+    switch (literal.getSetField()) {
+      case BOOL_VALUE:
+        return DataSchema.ColumnDataType.BOOLEAN;
+      case SHORT_VALUE:
+      case INT_VALUE:
+      case LONG_VALUE:
+        return DataSchema.ColumnDataType.LONG;
+      case DOUBLE_VALUE:
+        return DataSchema.ColumnDataType.DOUBLE;
+      case STRING_VALUE:
+        return DataSchema.ColumnDataType.STRING;
+      case BYTE_VALUE:
+      case BINARY_VALUE:
+        return DataSchema.ColumnDataType.BYTES;
+    }
+    return DataSchema.ColumnDataType.STRING;
+  }
+
+  /** Given a datatype, return its default null value as a {@link LiteralAstNode} */
+  private static LiteralAstNode getDefaultNullValueForType(DataSchema.ColumnDataType dataType) {
+    switch (dataType) {
+      case INT:
+        return DEFAULT_DIMENSION_NULL_VALUE_OF_INT_AST;
+      case LONG:
+        return DEFAULT_DIMENSION_NULL_VALUE_OF_LONG_AST;
+      case FLOAT:
+        return DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT_AST;
+      case DOUBLE:
+        return DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE_AST;
+      case STRING:
+      default:
+        return DEFAULT_DIMENSION_NULL_VALUE_OF_STRING_AST;
+    }
+  }
+
+  /** Output datatype of JSON_EXTRACT_SCALAR depends upon the function within which json path expression appears. */
+  private static DataSchema.ColumnDataType getJsonExtractOutputDataType(Function function) {
+    DataSchema.ColumnDataType dataType = DataSchema.ColumnDataType.STRING;
+    if (numericalFunctions.contains(function.getOperator().toUpperCase())) {
+      // If json path expression appears as argument of a numeric function, then it will be rewritten into a
+      // JSON_EXTRACT_SCALAR function that returns 'DOUBLE'
+      dataType = DataSchema.ColumnDataType.DOUBLE;
+    } else if (datetimeFunctions.contains(function.getOperator().toUpperCase())) {
+      // If json path expression appears as argument of a datetime function, then it will be rewritten into a
+      // JSON_EXTRACT_SCALAR function that returns 'LONG'
+      dataType = DataSchema.ColumnDataType.LONG;
+    }
+    return dataType;
+  }
+
+  /** List of function that require input to be in a number. */
+  public static Set<String> getNumericalFunctionList() {
+    Set<String> set = new HashSet<>();
+    // Include all ArithmeticFunctions functions
+    Method[] methods = ArithmeticFunctions.class.getDeclaredMethods();
+    for (Method method : methods) {
+      set.add(method.getName().toUpperCase());
+    }
+
+    // Include all aggregation functions
+    AggregationFunctionType[] aggs = AggregationFunctionType.values();
+    for (AggregationFunctionType agg : aggs) {
+      set.add(agg.getName().toUpperCase());
+    }
+
+    return set;
+  }
+
+  /** List of DateTime functions which require input to be of long type. */
+  public static Set<String> getDateTimeFunctionList() {
+    Set<String> set = new HashSet<>();
+    Method[] methods = DateTimeFunctions.class.getDeclaredMethods();
+    for (Method method : methods) {
+      set.add(method.getName().toUpperCase());
+    }
+
+    return set;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/statement/StatementOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/statement/StatementOptimizer.java
new file mode 100644
index 0000000..3ebfe56
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/statement/StatementOptimizer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.statement;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Interface for optimizing a particular class of SQL statement. Optimizers that implement this interface may modify
+ * several or all parts of the SQL statement.
+ */
+public interface StatementOptimizer {
+
+  /** Optimize the given SQL statement. */
+  public void optimize(PinotQuery query, @Nullable TableConfig tableConfig, @Nullable Schema schema);
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/statement/JsonStatementOptimizerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/statement/JsonStatementOptimizerTest.java
new file mode 100644
index 0000000..09ca628
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/statement/JsonStatementOptimizerTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.statement;
+
+import java.util.Arrays;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.core.query.optimizer.QueryOptimizer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Test cases to verify that {@link JsonStatementOptimizer} is properly rewriting queries that use JSON path expressions
+ * into equivalent queries that use JSON_MATCH and JSON_EXTRACT_SCALAR functions.
+ */
+public class JsonStatementOptimizerTest {
+  private static final QueryOptimizer OPTIMIZER = new QueryOptimizer();
+  private static final CalciteSqlCompiler SQL_COMPILER = new CalciteSqlCompiler();
+  private static final Schema SCHEMA =
+      new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("intColumn", FieldSpec.DataType.INT)
+          .addSingleValueDimension("longColumn", FieldSpec.DataType.LONG)
+          .addSingleValueDimension("stringColumn", FieldSpec.DataType.STRING)
+          .addSingleValueDimension("jsonColumn", FieldSpec.DataType.JSON).build();
+  private static final TableConfig TABLE_CONFIG_WITH_INDEX = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+      .setJsonIndexColumns(Arrays.asList("jsonColumn")).build();
+  private static final TableConfig TABLE_CONFIG_WITHOUT_INDEX = null;
+
+  /** Test that a json path expression in SELECT list is properly converted to a JSON_EXTRACT_SCALAR function within an AS function. */
+  @Test
+  public void testJsonSelect() {
+    // SELECT using json column.
+    assertEqualsQuery("SELECT jsonColumn FROM testTable",
+        "SELECT jsonColumn FROM testTable",
+        TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    // SELECT using a simple json path expression.
+    assertEqualsQuery("SELECT jsonColumn.x FROM testTable",
+        "SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.x', 'STRING', 'null') AS \"jsonColumn.x\" FROM testTable",
+        TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    // SELECT using json path expressions with array addressing.
+    assertEqualsQuery("SELECT jsonColumn.data[0][1].a.b[0] FROM testTable",
+        "SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.data[0][1].a.b[0]', 'STRING', 'null') AS \"jsonColumn.data[0][1].a.b[0]\" FROM testTable",
+        TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    // SELECT using json path expressions within double quotes.
+    assertEqualsQuery("SELECT \"jsonColumn.a.b.c[0][1][2][3].d.e.f[0].g\" FROM testTable",
+        "SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.a.b.c[0][1][2][3].d.e.f[0].g', 'STRING', 'null') AS \"jsonColumn.a.b.c[0][1][2][3].d.e.f[0].g\" FROM testTable",
+        TABLE_CONFIG_WITH_INDEX, SCHEMA);
+  }
+
+  /** Test that a predicate comparing a json path expression with literal is properly converted into a JSON_MATCH function. */
+  @Test
+  public void testJsonFilter() {
+    // Comparing json path expression with a string value.
+    assertEqualsQuery("SELECT * FROM testTable WHERE jsonColumn.name.first = 'daffy'",
+        "SELECT * FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.name.first\" = ''daffy''')", TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    assertEqualsQuery("SELECT * FROM testTable WHERE jsonColumn.name.first = 'daffy'",
+        "SELECT * FROM testTable WHERE JSON_EXTRACT_SCALAR(jsonColumn, '$.name.first', 'STRING', 'null') = 'daffy'", TABLE_CONFIG_WITHOUT_INDEX, SCHEMA);
+
+    // Comparing json path expression with a  numerical value.
+    assertEqualsQuery("SELECT * FROM testTable WHERE jsonColumn.id = 101",
+        "SELECT * FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.id\" = 101')", TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    assertEqualsQuery("SELECT * FROM testTable WHERE jsonColumn.id = 101",
+        "SELECT * FROM testTable WHERE JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'LONG', -9223372036854775808) = 101", TABLE_CONFIG_WITHOUT_INDEX, SCHEMA);
+
+    // Comparing json path expression with a  numerical value and checking for null value.
+    assertEqualsQuery("SELECT * FROM testTable WHERE jsonColumn.id IS NOT NULL AND jsonColumn.id = 101",
+        "SELECT * FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.id\" IS NOT NULL') AND JSON_MATCH(jsonColumn, '\"$.id\" = 101')",
+        TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    assertEqualsQuery("SELECT * FROM testTable WHERE jsonColumn.id IS NOT NULL AND jsonColumn.id = 101",
+        "SELECT * FROM testTable WHERE JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'JSON', 'null') IS NOT NULL AND JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'LONG', -9223372036854775808) = 101",
+        TABLE_CONFIG_WITHOUT_INDEX, SCHEMA);
+  }
+
+  /** Test that a json path expression in GROUP BY clause is properly converted into a JSON_EXTRACT_SCALAR function. */
+  @Test
+  public void testJsonGroupBy() {
+    assertEqualsQuery("SELECT jsonColumn.id, count(*) FROM testTable GROUP BY jsonColumn.id",
+        "SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'STRING', 'null') AS \"jsonColumn.id\", count(*) FROM testTable GROUP BY JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'STRING', 'null')",
+        TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    assertEqualsQuery("SELECT jsonColumn.id, count(*) FROM testTable GROUP BY jsonColumn.id",
+        "SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'STRING', 'null') AS \"jsonColumn.id\", count(*) FROM testTable GROUP BY JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'STRING', 'null')",
+        TABLE_CONFIG_WITHOUT_INDEX, SCHEMA);
+  }
+
+  /** Test that a json path expression in HAVING clause is properly converted into a JSON_EXTRACT_SCALAR function. */
+  @Test
+  public void testJsonGroupByHaving() {
+    assertEqualsQuery(
+        "SELECT jsonColumn.name.last, count(*) FROM testTable GROUP BY jsonColumn.name.last HAVING jsonColumn.name.last = 'mouse'",
+        "SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null') AS \"jsonColumn.name.last\", count(*) FROM testTable GROUP BY JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null') HAVING JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null') = 'mouse'",
+        TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    assertEqualsQuery(
+        "SELECT jsonColumn.name.last, count(*) FROM testTable GROUP BY jsonColumn.name.last HAVING jsonColumn.name.last = 'mouse'",
+        "SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null') AS \"jsonColumn.name.last\", count(*) FROM testTable GROUP BY JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null') HAVING JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null') = 'mouse'",
+        TABLE_CONFIG_WITHOUT_INDEX, SCHEMA);
+  }
+
+  /** Test a complex SQL statement with json path expression in SELECT, WHERE, and GROUP BY clauses. */
+  @Test
+  public void testJsonSelectFilterGroupBy() {
+    assertEqualsQuery(
+        "SELECT jsonColumn.name.last, count(*) FROM testTable WHERE jsonColumn.id = 101 GROUP BY jsonColumn.name.last",
+        "SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null') AS \"jsonColumn.name.last\", count(*) FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.id\" = 101') GROUP BY JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null')",
+        TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    assertEqualsQuery(
+        "SELECT jsonColumn.name.last, count(*) FROM testTable WHERE jsonColumn.id = 101 GROUP BY jsonColumn.name.last",
+        "SELECT JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null') AS \"jsonColumn.name.last\", count(*) FROM testTable WHERE JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'LONG', -9223372036854775808) = 101 GROUP BY JSON_EXTRACT_SCALAR(jsonColumn, '$.name.last', 'STRING', 'null')",
+        TABLE_CONFIG_WITHOUT_INDEX, SCHEMA);
+  }
+
+  /** Test an aggregation function over json path expression in SELECT clause. */
+  @Test
+  public void testTransformFunctionOverJsonPathSelectExpression() {
+    // Apply string transform function on json path expression.
+    assertEqualsQuery("SELECT UPPER(jsonColumn.name.first) FROM testTable",
+        "SELECT UPPER(JSON_EXTRACT_SCALAR(jsonColumn, '$.name.first', 'STRING', 'null')) AS \"upper(jsonColumn.name.first)\" FROM testTable",
+        TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    // Apply date transform function on json path expression and check for IS NULL
+    assertEqualsQuery("SELECT FROMEPOCHDAYS(jsonColumn.days) FROM testTable WHERE jsonColumn.days IS NULL",
+        "SELECT FROMEPOCHDAYS(JSON_EXTRACT_SCALAR(jsonColumn, '$.days', 'LONG', -9223372036854775808)) AS \"fromepochdays(jsonColumn.days)\" FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.days\" IS NULL')",
+        TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    // Apply date transform function on json path expression and check for IS NOT NULL
+    assertEqualsQuery("SELECT FROMEPOCHDAYS(jsonColumn.days) FROM testTable WHERE jsonColumn.days IS NOT NULL",
+        "SELECT FROMEPOCHDAYS(JSON_EXTRACT_SCALAR(jsonColumn, '$.days', 'LONG', -9223372036854775808)) AS \"fromepochdays(jsonColumn.days)\" FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.days\" IS NOT NULL')",
+        TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    assertEqualsQuery("SELECT FROMEPOCHDAYS(jsonColumn.days) FROM testTable WHERE jsonColumn.days IS NOT NULL",
+        "SELECT FROMEPOCHDAYS(JSON_EXTRACT_SCALAR(jsonColumn, '$.days', 'LONG', -9223372036854775808)) AS \"fromepochdays(jsonColumn.days)\" FROM testTable WHERE JSON_EXTRACT_SCALAR(jsonColumn, '$.days', 'JSON', 'null') IS NOT NULL",
+        TABLE_CONFIG_WITHOUT_INDEX, SCHEMA);
+  }
+
+  /** Test a numerical function over json path expression in SELECT clause. */
+  @Test
+  public void testNumericalFunctionOverJsonPathSelectExpression() {
+
+    // Test without user-specified alias.
+    assertEqualsQuery("SELECT MAX(jsonColumn.id) FROM testTable",
+        "SELECT MAX(JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'DOUBLE', '" + Double.NEGATIVE_INFINITY
+            + "')) AS \"max(jsonColumn.id)\" FROM testTable", TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    // Test with user-specified alias.
+    assertEqualsQuery("SELECT MAX(jsonColumn.id) AS x FROM testTable",
+        "SELECT MAX(JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'DOUBLE', '" + Double.NEGATIVE_INFINITY
+            + "')) AS x FROM testTable", TABLE_CONFIG_WITH_INDEX, SCHEMA);
+
+    // Test with nested function calls (minus function being used within max function).
+    assertEqualsQuery("SELECT MAX(jsonColumn.id - 5) FROM testTable",
+        "SELECT MAX(JSON_EXTRACT_SCALAR(jsonColumn, '$.id', 'DOUBLE', '" + Double.NEGATIVE_INFINITY
+            + "') - 5) AS \"max(minus(jsonColumn.id,'5'))\" FROM testTable", TABLE_CONFIG_WITH_INDEX, SCHEMA);
+  }
+
+  /**
+   * Given two queries, this function will validate that the query obtained after rewriting the first query is the
+   * same as the second query.
+   */
+  private static void assertEqualsQuery(String queryOriginal, String queryAfterRewrite, TableConfig config,
+      Schema schema) {
+    BrokerRequest userBrokerRequest = SQL_COMPILER.compileToBrokerRequest(queryOriginal);
+    PinotQuery userQuery = userBrokerRequest.getPinotQuery();
+    OPTIMIZER.optimize(userQuery, config, schema);
+
+    BrokerRequest rewrittenBrokerRequest = SQL_COMPILER.compileToBrokerRequest(queryAfterRewrite);
+    PinotQuery rewrittenQuery = rewrittenBrokerRequest.getPinotQuery();
+    OPTIMIZER.optimize(rewrittenQuery, config, schema);
+
+    // Currently there is no way to specify Double.NEGATIVE_INFINITY in SQL, so in the test cases we specify string '-Infinity' as
+    // default null value, but change "stringValue:-Infinity" to "doubleValue:-Infinity" to adjust for internal rewrite.
+    Assert.assertEquals(userQuery.toString(),
+        rewrittenQuery.toString().replace("stringValue:-Infinity", "doubleValue:-Infinity"));
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index 7582fe2..6bf84a3 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.optimizer.QueryOptimizer;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
@@ -40,7 +41,9 @@ import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUt
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
@@ -55,6 +58,8 @@ public abstract class BaseQueriesTest {
   protected static final Pql2Compiler PQL_COMPILER = new Pql2Compiler();
   protected static final CalciteSqlCompiler SQL_COMPILER = new CalciteSqlCompiler();
   protected static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2();
+  protected static final QueryOptimizer OPTIMIZER = new QueryOptimizer();
+
   protected static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);
 
   protected abstract String getFilter();
@@ -230,4 +235,38 @@ public abstract class BaseQueriesTest {
     brokerReduceService.shutDown();
     return brokerResponse;
   }
+
+  /**
+   * Run optimized SQL query on multiple index segments.
+   * <p>Use this to test the whole flow from server to broker.
+   * <p>The result should be equivalent to querying 4 identical index segments.
+   */
+  protected BrokerResponseNative getBrokerResponseForOptimizedSqlQuery(String sqlQuery, @Nullable Schema schema) {
+    return getBrokerResponseForOptimizedSqlQuery(sqlQuery, null, schema, PLAN_MAKER);
+  }
+
+  protected BrokerResponseNative getBrokerResponseForOptimizedSqlQuery(String sqlQuery, @Nullable TableConfig config,
+      @Nullable Schema schema) {
+    return getBrokerResponseForOptimizedSqlQuery(sqlQuery, config, schema, PLAN_MAKER);
+  }
+
+  /**
+   * Run optimized SQL query on multiple index segments with custom plan maker.
+   * <p>Use this to test the whole flow from server to broker.
+   * <p>The result should be equivalent to querying 4 identical index segments.
+   */
+  protected BrokerResponseNative getBrokerResponseForOptimizedSqlQuery(String sqlQuery, @Nullable TableConfig config,
+      @Nullable Schema schema, PlanMaker planMaker) {
+    BrokerRequest brokerRequest = SQL_COMPILER.compileToBrokerRequest(sqlQuery);
+    OPTIMIZER.optimize(brokerRequest.getPinotQuery(), config, schema);
+    Map<String, String> queryOptions = brokerRequest.getPinotQuery().getQueryOptions();
+    if (queryOptions == null) {
+      queryOptions = new HashMap<>();
+      brokerRequest.getPinotQuery().setQueryOptions(queryOptions);
+    }
+    queryOptions.put(Request.QueryOptionKey.GROUP_BY_MODE, Request.SQL);
+    queryOptions.put(Request.QueryOptionKey.RESPONSE_FORMAT, Request.SQL);
+    QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    return getBrokerResponse(queryContext, planMaker);
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/JsonPathQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/JsonPathQueriesTest.java
new file mode 100644
index 0000000..c96c0ba
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/JsonPathQueriesTest.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class JsonPathQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "JsonDataTypeQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final int NUM_RECORDS = 10;
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String JSON_COLUMN = "jsonColumn";
+  private static final String JSON_COLUMN_WITHOUT_INDEX = "jsonColumnWithoutIndex";
+
+  private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+      .addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
+      .addSingleValueDimension(JSON_COLUMN, FieldSpec.DataType.JSON)
+      .addSingleValueDimension(JSON_COLUMN_WITHOUT_INDEX, FieldSpec.DataType.JSON).build();
+
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  GenericRow createRecord(int intValue, long longValue, String stringValue, String jsonValue) {
+    GenericRow record = new GenericRow();
+    record.putValue(INT_COLUMN, intValue);
+    record.putValue(LONG_COLUMN, longValue);
+    record.putValue(STRING_COLUMN, stringValue);
+    record.putValue(JSON_COLUMN, jsonValue);
+    record.putValue(JSON_COLUMN_WITHOUT_INDEX, jsonValue);
+
+    return record;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    records.add(createRecord(1, 1, "daffy duck",
+        "{\"name\": {\"first\": \"daffy\", \"last\": \"duck\"}, \"id\": 101, \"data\": [\"a\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(2, 2, "mickey mouse",
+        "{\"name\": {\"first\": \"mickey\", \"last\": \"mouse\"}, \"id\": 111, \"data\": [\"e\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(3, 3, "donald duck",
+        "{\"name\": {\"first\": \"donald\", \"last\": \"duck\"}, \"id\": 121, \"data\": [\"f\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(4, 4, "scrooge mcduck",
+        "{\"name\": {\"first\": \"scrooge\", \"last\": \"mcduck\"}, \"id\": 131, \"data\": [\"g\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(5, 5, "minnie mouse",
+        "{\"name\": {\"first\": \"minnie\", \"last\": \"mouse\"}, \"id\": 141, \"data\": [\"h\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(6, 6, "daisy duck",
+        "{\"name\": {\"first\": \"daisy\", \"last\": \"duck\"}, \"id\": 161.5, \"data\": [\"i\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(7, 7, "pluto dog",
+        "{\"name\": {\"first\": \"pluto\", \"last\": \"dog\"}, \"id\": 161, \"data\": [\"j\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(8, 8, "goofy dwag",
+        "{\"name\": {\"first\": \"goofy\", \"last\": \"dwag\"}, \"id\": 171, \"data\": [\"k\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(9, 9, "ludwik von drake",
+        "{\"name\": {\"first\": \"ludwik\", \"last\": \"von drake\"}, \"id\": 181, \"data\": [\"l\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(10, 10, "nested array",
+        "{\"name\":{\"first\":\"nested\",\"last\":\"array\"},\"id\":111,\"data\":[{\"e\":[{\"x\":[{\"i1\":1,\"i2\":2}]},{\"y\":[{\"i1\":1,\"i2\":2}]},{\"z\":[{\"i1\":1,\"i2\":2}]}]},{\"b\":[{\"x\":[{\"i1\":1,\"i2\":2}]},{\"y\":[{\"i1\":1,\"i2\":2}]},{\"z\":[{\"i1\":10,\"i2\":20}]}]}]}"));
+    records.add(createRecord(11, 11, "multi-dimensional-1 array",
+        "{\"name\": {\"first\": \"multi-dimensional-1\",\"last\": \"array\"},\"id\": 111,\"data\": [[[1,2],[3,4]],[[\"a\",\"b\"],[\"c\",\"d\"]]]}"));
+    records.add(createRecord(12, 12, "multi-dimensional-2 array",
+        "{\"name\": {\"first\": \"multi-dimensional-2\",\"last\": \"array\"},\"id\": 111,\"data\": [[[1,2],[3,4]],[[\"a\",\"b\"],[\"c\",\"d\"]]]}"));
+    records.add(createRecord(13, 13, "multi-dimensional-1 array",
+        "{\"name\": {\"first\": \"multi-dimensional-1\",\"last\": \"array\"},\"id\": 111,\"data\": [[[1,2],[3,4]],[[\"a\",\"b\"],[\"c\",\"d\"]]]}"));
+    records.add(createRecord(13, 13, "days",
+        "{\"name\": {\"first\": \"multi-dimensional-1\",\"last\": \"array\"},\"days\": 111}"));
+
+    List<String> jsonIndexColumns = new ArrayList<>();
+    jsonIndexColumns.add("jsonColumn");
+    TABLE_CONFIG.getIndexingConfig().setJsonIndexColumns(jsonIndexColumns);
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setTableConfig(TABLE_CONFIG);
+    indexLoadingConfig.setJsonIndexColumns(new HashSet<String>(jsonIndexColumns));
+    indexLoadingConfig.setReadMode(ReadMode.mmap);
+
+    ImmutableSegment immutableSegment =
+        ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), indexLoadingConfig);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  private void checkresult(String query, Object[][] expecteds) {
+    BrokerResponseNative response1 = getBrokerResponseForOptimizedSqlQuery(query, TABLE_CONFIG, SCHEMA);
+    List<Object[]> rows = response1.getResultTable().getRows();
+
+    Assert.assertEquals(rows.size(), expecteds.length);
+    for (int i = 0; i < rows.size(); i++) {
+      Object[] actual = rows.get(i);
+      Object[] expected = expecteds[i];
+      Assert.assertEquals(actual, expected);
+    }
+  }
+
+  /** Test that a json path expression in SELECT list is properly converted to a JSON_EXTRACT_SCALAR function within an AS function. */
+  @Test
+  public void testJsonSelect() {
+    // SELECT using a simple json path expression.
+    Object[][] expecteds1 = {{"duck"}, {"mouse"}, {"duck"}};
+    checkresult("SELECT jsonColumn.name.last FROM testTable LIMIT 3", expecteds1);
+    //checkresult("SELECT jsonColumnWithoutIndex.name.last FROM testTable LIMIT 3", expecteds1);
+
+    Object[][] expecteds2 =
+        {{"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"1"}};
+    checkresult("SELECT jsonColumn.data[0].e[2].z[0].i1 FROM testTable", expecteds2);
+    //checkresult("SELECT jsonColumnWithoutIndex.data[0].e[2].z[0].i1 FROM testTable", expecteds2);
+  }
+
+  /** Test that a predicate comparing a json path expression with literal is properly converted into a JSON_MATCH function. */
+  @Test
+  public void testJsonFilter() {
+    // Comparing json path expression with a string value.
+    Object[][] expecteds1 =
+        {{1, "{\"name\":{\"first\":\"daffy\",\"last\":\"duck\"},\"id\":101,\"data\":[\"a\",\"b\",\"c\",\"d\"]}", "{\"name\":{\"first\":\"daffy\",\"last\":\"duck\"},\"id\":101,\"data\":[\"a\",\"b\",\"c\",\"d\"]}", 1L, "daffy duck"}};
+    checkresult("SELECT * FROM testTable WHERE jsonColumn.name.first = 'daffy' LIMIT 1", expecteds1);
+    checkresult("SELECT * FROM testTable WHERE jsonColumnWithoutIndex.name.first = 'daffy' LIMIT 1", expecteds1);
+
+    // Comparing json path expression with a numerical value.
+    Object[][] expecteds2 =
+        {{1, "{\"name\":{\"first\":\"daffy\",\"last\":\"duck\"},\"id\":101,\"data\":[\"a\",\"b\",\"c\",\"d\"]}", "{\"name\":{\"first\":\"daffy\",\"last\":\"duck\"},\"id\":101,\"data\":[\"a\",\"b\",\"c\",\"d\"]}", 1L, "daffy duck"}};
+    checkresult("SELECT * FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.id\" = 101') LIMIT 1", expecteds2);
+    try {
+      checkresult("SELECT * FROM testTable WHERE JSON_MATCH(jsonColumnWithoutIndex, '\"$.id\" = 101') LIMIT 1",
+          expecteds2);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert
+          .assertEquals(e.getMessage(), "Cannot apply JSON_MATCH on column: jsonColumnWithoutIndex without json index");
+    }
+
+    // Comparing json path expression with a string value.
+    Object[][] expecteds3 = {{4l}};
+    checkresult(
+        "SELECT count(*) FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.id\" IS NOT NULL') AND JSON_MATCH(jsonColumn, '\"$.id\" = 101')",
+        expecteds3);
+  }
+
+  /** Test that a json path expression in GROUP BY clause is properly converted into a JSON_EXTRACT_SCALAR function. */
+  @Test
+  public void testJsonGroupBy() {
+    Object[][] expecteds1 =
+        {{"111", 20l}, {"101", 4l}, {"null", 4l}, {"181", 4l}, {"161.5", 4l}, {"171", 4l}, {"161", 4l}, {"141", 4l}, {"131", 4l}, {"121", 4l}};
+    checkresult("SELECT jsonColumn.id, count(*) FROM testTable GROUP BY jsonColumn.id", expecteds1);
+    checkresult("SELECT jsonColumnWithoutIndex.id, count(*) FROM testTable GROUP BY jsonColumnWithoutIndex.id",
+        expecteds1);
+  }
+
+  /** Test that a json path expression in HAVING clause is properly converted into a JSON_EXTRACT_SCALAR function. */
+  @Test
+  public void testJsonGroupByHaving() {
+    Object[][] expecteds1 = {{"mouse", 8l}};
+    checkresult(
+        "SELECT jsonColumn.name.last, count(*) FROM testTable GROUP BY jsonColumn.name.last HAVING jsonColumn.name.last = 'mouse'",
+        expecteds1);
+    checkresult(
+        "SELECT jsonColumnWithoutIndex.name.last, count(*) FROM testTable GROUP BY jsonColumnWithoutIndex.name.last HAVING jsonColumnWithoutIndex.name.last = 'mouse'",
+        expecteds1);
+  }
+
+  /** Test a complex SQL statement with json path expression in SELECT, WHERE, and GROUP BY clauses. */
+  @Test
+  public void testJsonSelectFilterGroupBy() {
+    Object[][] expecteds1 = {{"duck", 4l}};
+    checkresult(
+        "SELECT jsonColumn.name.last, count(*) FROM testTable WHERE jsonColumn.id = 101 GROUP BY jsonColumn.name.last",
+        expecteds1);
+    checkresult(
+        "SELECT jsonColumnWithoutIndex.name.last, count(*) FROM testTable WHERE jsonColumnWithoutIndex.id = 101 GROUP BY jsonColumnWithoutIndex.name.last",
+        expecteds1);
+  }
+
+  /** Test an aggregation function over json path expression in SELECT clause. */
+  @Test
+  public void testTransformFunctionOverJsonPathSelectExpression() {
+    // Apply string transform function on json path expression.
+    Object[][] expecteds1 = {{"DAFFY"}};
+    checkresult("SELECT UPPER(jsonColumn.name.first) FROM testTable LIMIT 1", expecteds1);
+    checkresult("SELECT UPPER(jsonColumnWithoutIndex.name.first) FROM testTable LIMIT 1", expecteds1);
+
+    // Apply date transform function on json path expression and check for IS NULL
+    Object[][] expecteds2 = {{Long.MIN_VALUE}};
+    checkresult("SELECT FROMEPOCHDAYS(jsonColumn.days) FROM testTable WHERE jsonColumn.days IS NULL LIMIT 1",
+        expecteds2);
+    try {
+      checkresult(
+          "SELECT FROMEPOCHDAYS(jsonColumnWithoutIndex.days) FROM testTable WHERE jsonColumnWithoutIndex.days IS NULL LIMIT 1",
+          expecteds2);
+      Assert.fail();
+    } catch (BadQueryRequestException e) {
+      Assert
+          .assertEquals(e.getMessage(), "java.lang.UnsupportedOperationException: Unsupported predicate type: IS_NULL");
+    }
+
+    // Apply date transform function on json path expression and check for IS NOT NULL
+    Object[][] expecteds3 = {{9590400000l}};
+    checkresult("SELECT FROMEPOCHDAYS(jsonColumn.days) FROM testTable WHERE jsonColumn.days IS NOT NULL LIMIT 1",
+        expecteds3);
+    try {
+      checkresult(
+          "SELECT FROMEPOCHDAYS(jsonColumnWithoutIndex.days) FROM testTable WHERE jsonColumnWithoutIndex.days IS NOT NULL LIMIT 1",
+          expecteds3);
+      Assert.fail();
+    } catch (BadQueryRequestException e) {
+      Assert.assertEquals(e.getMessage(),
+          "java.lang.UnsupportedOperationException: Unsupported predicate type: IS_NOT_NULL");
+    }
+  }
+
+  /** Test a numerical function over json path expression in SELECT clause. */
+  @Test
+  public void testNumericalFunctionOverJsonPathSelectExpression() {
+
+    // Test without user-specified alias.
+    Object[][] expecteds1 = {{181.0}};
+    checkresult("SELECT MAX(jsonColumn.id) FROM testTable", expecteds1);
+    checkresult("SELECT MAX(jsonColumnWithoutIndex.id) FROM testTable", expecteds1);
+
+    // Test with user-specified alias.
+    Object[][] expecteds2 = {{181.0}};
+    checkresult("SELECT MAX(jsonColumn.id) AS x FROM testTable", expecteds2);
+    checkresult("SELECT MAX(jsonColumnWithoutIndex.id) AS x FROM testTable", expecteds2);
+
+    // Test with nested function calls (minus function being used within max function).
+    Object[][] expecteds3 = {{176.0}};
+    checkresult("SELECT MAX(jsonColumn.id - 5) FROM testTable", expecteds3);
+    checkresult("SELECT MAX(jsonColumnWithoutIndex.id - 5) FROM testTable", expecteds3);
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 25b29ad..b8ab67b5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -46,6 +46,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.spi.data.FieldSpec.DataType.JSON;
 import static org.apache.pinot.spi.data.FieldSpec.DataType.STRING;
 
 
@@ -80,6 +81,10 @@ public final class Schema implements Serializable {
   private transient final List<String> _metricNames = new ArrayList<>();
   private transient final List<String> _dateTimeNames = new ArrayList<>();
 
+  // Set to true if this schema has a JSON column (used to quickly decide whether to run JsonStatementOptimizer on
+  // queries or not).
+  private boolean _hasJSONColumn;
+
   public static Schema fromFile(File schemaFile)
       throws IOException {
     return JsonUtils.fileToObject(schemaFile, Schema.class);
@@ -211,6 +216,7 @@ public final class Schema implements Serializable {
         throw new UnsupportedOperationException("Unsupported field type: " + fieldType);
     }
 
+    _hasJSONColumn |= fieldSpec.getDataType().equals(JSON);
     _fieldSpecMap.put(columnName, fieldSpec);
   }
 
@@ -256,6 +262,10 @@ public final class Schema implements Serializable {
     return _fieldSpecMap.containsKey(columnName);
   }
 
+  public boolean hasJSONColumn() {
+    return _hasJSONColumn;
+  }
+
   @JsonIgnore
   public Map<String, FieldSpec> getFieldSpecMap() {
     return _fieldSpecMap;
@@ -643,7 +653,8 @@ public final class Schema implements Serializable {
         .isEqualIgnoreOrder(_dateTimeFieldSpecs, that._dateTimeFieldSpecs) && EqualityUtils
         .isEqualIgnoreOrder(_complexFieldSpecs, that._complexFieldSpecs) && EqualityUtils
         .isEqualMap(_fieldSpecMap, that._fieldSpecMap) && EqualityUtils
-        .isEqual(_primaryKeyColumns, that._primaryKeyColumns);
+        .isEqual(_primaryKeyColumns, that._primaryKeyColumns) && EqualityUtils
+        .isEqual(_hasJSONColumn, that._hasJSONColumn);
   }
 
   /**
@@ -678,6 +689,7 @@ public final class Schema implements Serializable {
     result = EqualityUtils.hashCodeOf(result, _complexFieldSpecs);
     result = EqualityUtils.hashCodeOf(result, _fieldSpecMap);
     result = EqualityUtils.hashCodeOf(result, _primaryKeyColumns);
+    result = EqualityUtils.hashCodeOf(result, _hasJSONColumn);
     return result;
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org