You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/06 23:44:56 UTC

[pinot] branch master updated: push JSON Path evaluation down to storage layer (#7820)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c999a23  push JSON Path evaluation down to storage layer (#7820)
c999a23 is described below

commit c999a23caae3e3769e1729d56da4c5c0de594b31
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Mon Dec 6 23:44:41 2021 +0000

    push JSON Path evaluation down to storage layer (#7820)
    
    Pushes JSON path evaluation down to the storage layer (giving direct access to dictionaries and forward index) which avoids various intermediate materialisations of strings, byte arrays and so on. The benefit to users is the potential to avoid a lot of allocation of large byte[] and String once the JsonPath library can accept UTF-8 encoded byte[]. This also creates an SPI to make the evaluation logic pluggable. The same pushdown mechanism could be abstracted to make extensible to regu [...]
---
 .../apache/pinot/core/common/DataBlockCache.java   | 111 ++++
 .../org/apache/pinot/core/common/DataFetcher.java  | 196 ++++++-
 .../evaluators/DefaultJsonPathEvaluator.java       | 589 +++++++++++++++++++++
 .../core/operator/blocks/ProjectionBlock.java      | 111 ++++
 .../function/IdentifierTransformFunction.java      |  58 +-
 .../JsonExtractScalarTransformFunction.java        | 180 ++++++-
 .../function/PushDownTransformFunction.java        | 124 +++++
 .../apache/pinot/queries/BaseJsonQueryTest.java    | 209 ++++++++
 .../pinot/queries/JsonExtractScalarTest.java       | 157 ++++++
 .../apache/pinot/queries/JsonPathQueriesTest.java  | 171 ++----
 .../segment/spi/evaluator/TransformEvaluator.java  | 155 ++++++
 .../spi/evaluator/json/JsonPathEvaluator.java      |  30 ++
 .../evaluator/json/JsonPathEvaluatorProvider.java  |  33 ++
 .../spi/evaluator/json/JsonPathEvaluators.java     | 147 +++++
 14 files changed, 2111 insertions(+), 160 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
index 141ecfb..24e71b1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nonnull;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.utils.EqualityUtils;
 
@@ -121,6 +122,17 @@ public class DataBlockCache {
   }
 
   /**
+   * Get the int values for a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, int[] buffer) {
+    _dataFetcher.fetchIntValues(column, evaluator, _docIds, _length, buffer);
+  }
+
+  /**
    * Get the long values for a single-valued column.
    *
    * @param column Column name
@@ -140,6 +152,17 @@ public class DataBlockCache {
   }
 
   /**
+   * Get the long values for a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, long[] buffer) {
+    _dataFetcher.fetchLongValues(column, evaluator, _docIds, _length, buffer);
+  }
+
+  /**
    * Get the float values for a single-valued column.
    *
    * @param column Column name
@@ -159,6 +182,17 @@ public class DataBlockCache {
   }
 
   /**
+   * Get the float values for a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, float[] buffer) {
+    _dataFetcher.fetchFloatValues(column, evaluator, _docIds, _length, buffer);
+  }
+
+  /**
    * Get the double values for a single-valued column.
    *
    * @param column Column name
@@ -178,6 +212,17 @@ public class DataBlockCache {
   }
 
   /**
+   * Get the double values for a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, double[] buffer) {
+    _dataFetcher.fetchDoubleValues(column, evaluator, _docIds, _length, buffer);
+  }
+
+  /**
    * Get the string values for a single-valued column.
    *
    * @param column Column name
@@ -197,6 +242,17 @@ public class DataBlockCache {
   }
 
   /**
+   * Get the string values for a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, String[] buffer) {
+    _dataFetcher.fetchStringValues(column, evaluator, _docIds, _length, buffer);
+  }
+
+  /**
    * Get byte[] values for the given single-valued column.
    *
    * @param column Column to read
@@ -258,6 +314,17 @@ public class DataBlockCache {
   }
 
   /**
+   * Get the int[][] values for a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, int[][] buffer) {
+    _dataFetcher.fetchIntValues(column, evaluator, _docIds, _length, buffer);
+  }
+
+  /**
    * Get the long values for a multi-valued column.
    *
    * @param column Column name
@@ -277,6 +344,17 @@ public class DataBlockCache {
   }
 
   /**
+   * Get the long[][] values for a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, long[][] buffer) {
+    _dataFetcher.fetchLongValues(column, evaluator, _docIds, _length, buffer);
+  }
+
+  /**
    * Get the float values for a multi-valued column.
    *
    * @param column Column name
@@ -296,6 +374,17 @@ public class DataBlockCache {
   }
 
   /**
+   * Get the float[][] values for a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, float[][] buffer) {
+    _dataFetcher.fetchFloatValues(column, evaluator, _docIds, _length, buffer);
+  }
+
+  /**
    * Get the double values for a multi-valued column.
    *
    * @param column Column name
@@ -315,6 +404,17 @@ public class DataBlockCache {
   }
 
   /**
+   * Get the double[][] values for a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, double[][] buffer) {
+    _dataFetcher.fetchDoubleValues(column, evaluator, _docIds, _length, buffer);
+  }
+
+  /**
    * Get the string values for a multi-valued column.
    *
    * @param column Column name
@@ -334,6 +434,17 @@ public class DataBlockCache {
   }
 
   /**
+   * Get the String[][] values for a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, String[][] buffer) {
+    _dataFetcher.fetchStringValues(column, evaluator, _docIds, _length, buffer);
+  }
+
+  /**
    * Get the number of values for a multi-valued column.
    *
    * @param column Column name
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
index e89db7f..458bf84 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
@@ -101,6 +102,19 @@ public class DataFetcher {
   }
 
   /**
+   * Fetch and transform the int values from a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchIntValues(String column, TransformEvaluator evaluator, int[] inDocIds, int length, int[] outValues) {
+    _columnValueReaderMap.get(column).readIntValues(evaluator, inDocIds, length, outValues);
+  }
+
+  /**
    * Fetch the long values for a single-valued column.
    *
    * @param column Column name
@@ -113,7 +127,21 @@ public class DataFetcher {
   }
 
   /**
-   * Fetch the float values for a single-valued column.
+   * Fetch and transform the int values from a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchLongValues(String column, TransformEvaluator evaluator, int[] inDocIds, int length,
+      long[] outValues) {
+    _columnValueReaderMap.get(column).readLongValues(evaluator, inDocIds, length, outValues);
+  }
+
+  /**
+   * Fetch long values for a single-valued column.
    *
    * @param column Column name
    * @param inDocIds Input document Ids buffer
@@ -125,6 +153,20 @@ public class DataFetcher {
   }
 
   /**
+   * Fetch and transform float values from a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchFloatValues(String column, TransformEvaluator evaluator, int[] inDocIds, int length,
+      float[] outValues) {
+    _columnValueReaderMap.get(column).readFloatValues(evaluator, inDocIds, length, outValues);
+  }
+
+  /**
    * Fetch the double values for a single-valued column.
    *
    * @param column Column name
@@ -137,6 +179,20 @@ public class DataFetcher {
   }
 
   /**
+   * Fetch and transform double values from a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchDoubleValues(String column, TransformEvaluator evaluator, int[] inDocIds, int length,
+      double[] outValues) {
+    _columnValueReaderMap.get(column).readDoubleValues(evaluator, inDocIds, length, outValues);
+  }
+
+  /**
    * Fetch the string values for a single-valued column.
    *
    * @param column Column name
@@ -149,6 +205,20 @@ public class DataFetcher {
   }
 
   /**
+   * Fetch and transform String values from a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchStringValues(String column, TransformEvaluator evaluator, int[] inDocIds, int length,
+      String[] outValues) {
+    _columnValueReaderMap.get(column).readStringValues(evaluator, inDocIds, length, outValues);
+  }
+
+  /**
    * Fetch byte[] values for a single-valued column.
    *
    * @param column Column to read
@@ -189,6 +259,20 @@ public class DataFetcher {
   }
 
   /**
+   * Fetch int[] values from a JSON column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchIntValues(String column, TransformEvaluator evaluator, int[] inDocIds, int length,
+      int[][] outValues) {
+    _columnValueReaderMap.get(column).readIntValuesMV(evaluator, inDocIds, length, outValues);
+  }
+
+  /**
    * Fetch the long values for a multi-valued column.
    *
    * @param column Column name
@@ -201,6 +285,20 @@ public class DataFetcher {
   }
 
   /**
+   * Fetch and transform long[] values from a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchLongValues(String column, TransformEvaluator evaluator, int[] inDocIds, int length,
+      long[][] outValues) {
+    _columnValueReaderMap.get(column).readLongValuesMV(evaluator, inDocIds, length, outValues);
+  }
+
+  /**
    * Fetch the float values for a multi-valued column.
    *
    * @param column Column name
@@ -213,6 +311,20 @@ public class DataFetcher {
   }
 
   /**
+   * Fetch and transform float[] values from a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchFloatValues(String column, TransformEvaluator evaluator, int[] inDocIds, int length,
+      float[][] outValues) {
+    _columnValueReaderMap.get(column).readFloatValuesMV(evaluator, inDocIds, length, outValues);
+  }
+
+  /**
    * Fetch the double values for a multi-valued column.
    *
    * @param column Column name
@@ -225,6 +337,20 @@ public class DataFetcher {
   }
 
   /**
+   * Fetch and transform double[] values from a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchDoubleValues(String column, TransformEvaluator evaluator, int[] inDocIds, int length,
+      double[][] outValues) {
+    _columnValueReaderMap.get(column).readDoubleValuesMV(evaluator, inDocIds, length, outValues);
+  }
+
+  /**
    * Fetch the string values for a multi-valued column.
    *
    * @param column Column name
@@ -237,6 +363,20 @@ public class DataFetcher {
   }
 
   /**
+   * Fetch and transform String[][] values from a column.
+   *
+   * @param column Column name
+   * @param evaluator transform evaluator
+   * @param inDocIds Input document Ids buffer
+   * @param length Number of input document Ids
+   * @param outValues Buffer for output
+   */
+  public void fetchStringValues(String column, TransformEvaluator evaluator, int[] inDocIds, int length,
+      String[][] outValues) {
+    _columnValueReaderMap.get(column).readStringValuesMV(evaluator, inDocIds, length, outValues);
+  }
+
+  /**
    * Fetch the number of values for a multi-valued column.
    *
    * @param column Column name
@@ -317,6 +457,11 @@ public class DataFetcher {
       }
     }
 
+    void readIntValues(TransformEvaluator evaluator, int[] docIds, int length, int[] valueBuffer) {
+      evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(),
+          valueBuffer);
+    }
+
     void readLongValues(int[] docIds, int length, long[] valueBuffer) {
       ForwardIndexReaderContext readerContext = getReaderContext();
       if (_dictionary != null) {
@@ -356,6 +501,11 @@ public class DataFetcher {
       }
     }
 
+    void readLongValues(TransformEvaluator evaluator, int[] docIds, int length, long[] valueBuffer) {
+      evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(),
+          valueBuffer);
+    }
+
     void readFloatValues(int[] docIds, int length, float[] valueBuffer) {
       ForwardIndexReaderContext readerContext = getReaderContext();
       if (_dictionary != null) {
@@ -395,6 +545,11 @@ public class DataFetcher {
       }
     }
 
+    void readFloatValues(TransformEvaluator evaluator, int[] docIds, int length, float[] valueBuffer) {
+      evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(),
+          valueBuffer);
+    }
+
     void readDoubleValues(int[] docIds, int length, double[] valueBuffer) {
       ForwardIndexReaderContext readerContext = getReaderContext();
       if (_dictionary != null) {
@@ -434,6 +589,11 @@ public class DataFetcher {
       }
     }
 
+    void readDoubleValues(TransformEvaluator evaluator, int[] docIds, int length, double[] valueBuffer) {
+      evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(),
+          valueBuffer);
+    }
+
     void readStringValues(int[] docIds, int length, String[] valueBuffer) {
       ForwardIndexReaderContext readerContext = getReaderContext();
       if (_dictionary != null) {
@@ -478,6 +638,11 @@ public class DataFetcher {
       }
     }
 
+    void readStringValues(TransformEvaluator evaluator, int[] docIds, int length, String[] valueBuffer) {
+      evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(),
+          valueBuffer);
+    }
+
     void readBytesValues(int[] docIds, int length, byte[][] valueBuffer) {
       ForwardIndexReaderContext readerContext = getReaderContext();
       if (_dictionary != null) {
@@ -519,6 +684,11 @@ public class DataFetcher {
       }
     }
 
+    void readIntValuesMV(TransformEvaluator evaluator, int[] docIds, int length, int[][] valuesBuffer) {
+      evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(),
+          valuesBuffer);
+    }
+
     void readLongValuesMV(int[] docIds, int length, long[][] valuesBuffer) {
       assert _dictionary != null;
       for (int i = 0; i < length; i++) {
@@ -529,6 +699,11 @@ public class DataFetcher {
       }
     }
 
+    void readLongValuesMV(TransformEvaluator evaluator, int[] docIds, int length, long[][] valuesBuffer) {
+      evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(),
+          valuesBuffer);
+    }
+
     void readFloatValuesMV(int[] docIds, int length, float[][] valuesBuffer) {
       assert _dictionary != null;
       for (int i = 0; i < length; i++) {
@@ -539,6 +714,11 @@ public class DataFetcher {
       }
     }
 
+    void readFloatValuesMV(TransformEvaluator evaluator, int[] docIds, int length, float[][] valuesBuffer) {
+      evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(),
+          valuesBuffer);
+    }
+
     void readDoubleValuesMV(int[] docIds, int length, double[][] valuesBuffer) {
       assert _dictionary != null;
       for (int i = 0; i < length; i++) {
@@ -549,6 +729,11 @@ public class DataFetcher {
       }
     }
 
+    void readDoubleValuesMV(TransformEvaluator evaluator, int[] docIds, int length, double[][] valuesBuffer) {
+      evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(),
+          valuesBuffer);
+    }
+
     void readStringValuesMV(int[] docIds, int length, String[][] valuesBuffer) {
       assert _dictionary != null;
       for (int i = 0; i < length; i++) {
@@ -559,12 +744,21 @@ public class DataFetcher {
       }
     }
 
+    void readStringValuesMV(TransformEvaluator evaluator, int[] docIds, int length, String[][] valuesBuffer) {
+      evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(),
+          valuesBuffer);
+    }
+
     public void readNumValuesMV(int[] docIds, int length, int[] numValuesBuffer) {
       for (int i = 0; i < length; i++) {
         numValuesBuffer[i] = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
       }
     }
 
+    private int[] getSVDictIdsBuffer() {
+      return _dictionary == null ? null : THREAD_LOCAL_DICT_IDS.get();
+    }
+
     @Override
     public void close()
         throws IOException {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java b/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
new file mode 100644
index 0000000..c399b76
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
@@ -0,0 +1,589 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common.evaluators;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.ParseContext;
+import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
+import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.function.JsonPathCache;
+import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluator;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
+
+  private static final ParseContext JSON_PARSER_CONTEXT = JsonPath.using(
+      new Configuration.ConfigurationBuilder().jsonProvider(new JacksonJsonProvider())
+          .mappingProvider(new JacksonMappingProvider()).options(Option.SUPPRESS_EXCEPTIONS).build());
+
+  private static final int[] EMPTY_INTS = new int[0];
+  private static final long[] EMPTY_LONGS = new long[0];
+  private static final float[] EMPTY_FLOATS = new float[0];
+  private static final double[] EMPTY_DOUBLES = new double[0];
+  private static final String[] EMPTY_STRINGS = new String[0];
+
+  public static JsonPathEvaluator create(String jsonPath, @Nullable Object defaultValue) {
+    try {
+      return new DefaultJsonPathEvaluator(JsonPathCache.INSTANCE.getOrCompute(jsonPath), defaultValue);
+    } catch (InvalidPathException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  private final JsonPath _jsonPath;
+  private final Object _defaultValue;
+
+  private DefaultJsonPathEvaluator(JsonPath jsonPath, @Nullable Object defaultValue) {
+    _jsonPath = jsonPath;
+    _defaultValue = defaultValue;
+  }
+
+  public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, int[] valueBuffer) {
+    int defaultValue = (_defaultValue instanceof Number) ? ((Number) _defaultValue).intValue() : 0;
+    if (reader.isDictionaryEncoded()) {
+      reader.readDictIds(docIds, length, dictIdsBuffer, context);
+      if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromBytes(dictionary, dictIdsBuffer[i]), defaultValue, valueBuffer);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromString(dictionary, dictIdsBuffer[i]), defaultValue, valueBuffer);
+        }
+      }
+    } else {
+      switch (reader.getValueType()) {
+        case JSON:
+        case STRING:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromString(reader, context, docIds[i]), defaultValue, valueBuffer);
+          }
+          break;
+        case BYTES:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromBytes(reader, context, docIds[i]), defaultValue, valueBuffer);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+  public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, long[] valueBuffer) {
+    long defaultValue = (_defaultValue instanceof Number) ? ((Number) _defaultValue).longValue() : 0L;
+    if (reader.isDictionaryEncoded()) {
+      reader.readDictIds(docIds, length, dictIdsBuffer, context);
+      if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromBytes(dictionary, dictIdsBuffer[i]), defaultValue, valueBuffer);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromString(dictionary, dictIdsBuffer[i]), defaultValue, valueBuffer);
+        }
+      }
+    } else {
+      switch (reader.getValueType()) {
+        case JSON:
+        case STRING:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromString(reader, context, docIds[i]), defaultValue, valueBuffer);
+          }
+          break;
+        case BYTES:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromBytes(reader, context, docIds[i]), defaultValue, valueBuffer);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+  public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, float[] valueBuffer) {
+    float defaultValue = (_defaultValue instanceof Number) ? ((Number) _defaultValue).floatValue() : 0F;
+    if (reader.isDictionaryEncoded()) {
+      reader.readDictIds(docIds, length, dictIdsBuffer, context);
+      if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromBytes(dictionary, dictIdsBuffer[i]), defaultValue, valueBuffer);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromString(dictionary, dictIdsBuffer[i]), defaultValue, valueBuffer);
+        }
+      }
+    } else {
+      switch (reader.getValueType()) {
+        case JSON:
+        case STRING:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromString(reader, context, docIds[i]), defaultValue, valueBuffer);
+          }
+          break;
+        case BYTES:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromBytes(reader, context, docIds[i]), defaultValue, valueBuffer);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+  public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, double[] valueBuffer) {
+    double defaultValue = (_defaultValue instanceof Number) ? ((Number) _defaultValue).doubleValue() : 0D;
+    if (reader.isDictionaryEncoded()) {
+      reader.readDictIds(docIds, length, dictIdsBuffer, context);
+      if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromBytes(dictionary, dictIdsBuffer[i]), defaultValue, valueBuffer);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromString(dictionary, dictIdsBuffer[i]), defaultValue, valueBuffer);
+        }
+      }
+    } else {
+      switch (reader.getValueType()) {
+        case JSON:
+        case STRING:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromString(reader, context, docIds[i]), defaultValue, valueBuffer);
+          }
+          break;
+        case BYTES:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromBytes(reader, context, docIds[i]), defaultValue, valueBuffer);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+  public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, String[] valueBuffer) {
+    if (reader.isDictionaryEncoded()) {
+      reader.readDictIds(docIds, length, dictIdsBuffer, context);
+      if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          processValue(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      }
+    } else {
+      switch (reader.getValueType()) {
+        case JSON:
+        case STRING:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromString(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        case BYTES:
+          for (int i = 0; i < length; i++) {
+            processValue(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+  public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, int[][] valueBuffer) {
+    if (reader.isDictionaryEncoded()) {
+      reader.readDictIds(docIds, length, dictIdsBuffer, context);
+      if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+        for (int i = 0; i < length; i++) {
+          processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      }
+    } else {
+      switch (reader.getValueType()) {
+        case JSON:
+        case STRING:
+          for (int i = 0; i < length; i++) {
+            processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        case BYTES:
+          for (int i = 0; i < length; i++) {
+            processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+  public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, long[][] valueBuffer) {
+    if (reader.isDictionaryEncoded()) {
+      reader.readDictIds(docIds, length, dictIdsBuffer, context);
+      if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+        for (int i = 0; i < length; i++) {
+          processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      }
+    } else {
+      switch (reader.getValueType()) {
+        case JSON:
+        case STRING:
+          for (int i = 0; i < length; i++) {
+            processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        case BYTES:
+          for (int i = 0; i < length; i++) {
+            processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+  public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, float[][] valueBuffer) {
+    if (reader.isDictionaryEncoded()) {
+      reader.readDictIds(docIds, length, dictIdsBuffer, context);
+      if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+        for (int i = 0; i < length; i++) {
+          processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      }
+    } else {
+      switch (reader.getValueType()) {
+        case JSON:
+        case STRING:
+          for (int i = 0; i < length; i++) {
+            processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        case BYTES:
+          for (int i = 0; i < length; i++) {
+            processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+  public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, double[][] valueBuffer) {
+    if (reader.isDictionaryEncoded()) {
+      reader.readDictIds(docIds, length, dictIdsBuffer, context);
+      if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+        for (int i = 0; i < length; i++) {
+          processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      }
+    } else {
+      switch (reader.getValueType()) {
+        case JSON:
+        case STRING:
+          for (int i = 0; i < length; i++) {
+            processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        case BYTES:
+          for (int i = 0; i < length; i++) {
+            processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+  public <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, String[][] valueBuffer) {
+    if (reader.isDictionaryEncoded()) {
+      reader.readDictIds(docIds, length, dictIdsBuffer, context);
+      if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
+        for (int i = 0; i < length; i++) {
+          processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      } else {
+        for (int i = 0; i < length; i++) {
+          processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
+        }
+      }
+    } else {
+      switch (reader.getValueType()) {
+        case JSON:
+        case STRING:
+          for (int i = 0; i < length; i++) {
+            processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        case BYTES:
+          for (int i = 0; i < length; i++) {
+            processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
+          }
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+  private <T> T extractFromBytes(Dictionary dictionary, int dictId) {
+    try {
+      // TODO make JsonPath accept byte[] - Jackson can
+      return JSON_PARSER_CONTEXT.parse(new String(dictionary.getBytesValue(dictId), StandardCharsets.UTF_8))
+          .read(_jsonPath);
+    } catch (Exception e) {
+      // TODO JsonPath 2.7.0 will not throw here but produce null when path not found
+      if (_defaultValue == null) {
+        throwPathNotFoundException(e);
+      }
+      return null;
+    }
+  }
+
+  private <T, R extends ForwardIndexReaderContext> T extractFromBytes(ForwardIndexReader<R> reader, R context,
+      int docId) {
+    try {
+      // TODO make JsonPath accept byte[] - Jackson can
+      return JSON_PARSER_CONTEXT.parse(new String(reader.getBytes(docId, context), StandardCharsets.UTF_8))
+          .read(_jsonPath);
+    } catch (Exception e) {
+      // TODO JsonPath 2.7.0 will not throw here but produce null when path not found
+      if (_defaultValue == null) {
+        throwPathNotFoundException(e);
+      }
+      return null;
+    }
+  }
+
+  private <T> T extractFromString(Dictionary dictionary, int dictId) {
+    try {
+      return JSON_PARSER_CONTEXT.parse(dictionary.getStringValue(dictId)).read(_jsonPath);
+    } catch (Exception e) {
+      // TODO JsonPath 2.7.0 will not throw here but produce null when path not found
+      if (_defaultValue == null) {
+        throwPathNotFoundException(e);
+      }
+      return null;
+    }
+  }
+
+  private <T, R extends ForwardIndexReaderContext> T extractFromString(ForwardIndexReader<R> reader, R context,
+      int docId) {
+    try {
+      return JSON_PARSER_CONTEXT.parse(reader.getString(docId, context)).read(_jsonPath);
+    } catch (Exception e) {
+      // TODO JsonPath 2.7.0 will not throw here but produce null when path not found
+      if (_defaultValue == null) {
+        throwPathNotFoundException(e);
+      }
+      return null;
+    }
+  }
+
+  private void processValue(int index, Object value, int defaultValue, int[] valueBuffer) {
+    if (value instanceof Number) {
+      valueBuffer[index] = ((Number) value).intValue();
+    } else if (value == null) {
+      if (_defaultValue != null) {
+        valueBuffer[index] = defaultValue;
+      } else {
+        throwPathNotFoundException();
+      }
+    } else {
+      valueBuffer[index] = Integer.parseInt(value.toString());
+    }
+  }
+
+  private void processValue(int index, Object value, long defaultValue, long[] valueBuffer) {
+    if (value instanceof Number) {
+      valueBuffer[index] = ((Number) value).longValue();
+    } else if (value == null) {
+      if (_defaultValue != null) {
+        valueBuffer[index] = defaultValue;
+      } else {
+        throwPathNotFoundException();
+      }
+    } else {
+      // Handle scientific notation
+      valueBuffer[index] = (long) Double.parseDouble(value.toString());
+    }
+  }
+
+  private void processValue(int index, Object value, float defaultValue, float[] valueBuffer) {
+    if (value instanceof Number) {
+      valueBuffer[index] = ((Number) value).floatValue();
+    } else if (value == null) {
+      if (_defaultValue != null) {
+        valueBuffer[index] = defaultValue;
+      } else {
+        throwPathNotFoundException();
+      }
+    } else {
+      valueBuffer[index] = Float.parseFloat(value.toString());
+    }
+  }
+
+  private void processValue(int index, Object value, double defaultValue, double[] valueBuffer) {
+    if (value instanceof Number) {
+      valueBuffer[index] = ((Number) value).doubleValue();
+    } else if (value == null) {
+      if (_defaultValue != null) {
+        valueBuffer[index] = defaultValue;
+      } else {
+        throwPathNotFoundException();
+      }
+    } else {
+      valueBuffer[index] = Double.parseDouble(value.toString());
+    }
+  }
+
+  private void processValue(int index, Object value, String[] valueBuffer) {
+    if (value instanceof String) {
+      valueBuffer[index] = (String) value;
+    } else if (value == null) {
+      if (_defaultValue != null) {
+        valueBuffer[index] = _defaultValue.toString();
+      } else {
+        throwPathNotFoundException();
+      }
+    } else {
+      valueBuffer[index] = JsonUtils.objectToJsonNode(value).toString();
+    }
+  }
+
+  private void processList(int index, List<Integer> value, int[][] valuesBuffer) {
+    if (value == null) {
+      valuesBuffer[index] = EMPTY_INTS;
+    } else {
+      int numValues = value.size();
+      int[] values = new int[numValues];
+      for (int j = 0; j < numValues; j++) {
+        values[j] = value.get(j);
+      }
+      valuesBuffer[index] = values;
+    }
+  }
+
+  private void processList(int index, List<Long> value, long[][] valuesBuffer) {
+    if (value == null) {
+      valuesBuffer[index] = EMPTY_LONGS;
+    } else {
+      int numValues = value.size();
+      long[] values = new long[numValues];
+      for (int j = 0; j < numValues; j++) {
+        values[j] = value.get(j);
+      }
+      valuesBuffer[index] = values;
+    }
+  }
+
+  private void processList(int index, List<Float> value, float[][] valuesBuffer) {
+    if (value == null) {
+      valuesBuffer[index] = EMPTY_FLOATS;
+    } else {
+      int numValues = value.size();
+      float[] values = new float[numValues];
+      for (int j = 0; j < numValues; j++) {
+        values[j] = value.get(j);
+      }
+      valuesBuffer[index] = values;
+    }
+  }
+
+  private void processList(int index, List<Double> value, double[][] valuesBuffer) {
+    if (value == null) {
+      valuesBuffer[index] = EMPTY_DOUBLES;
+    } else {
+      int numValues = value.size();
+      double[] values = new double[numValues];
+      for (int j = 0; j < numValues; j++) {
+        values[j] = value.get(j);
+      }
+      valuesBuffer[index] = values;
+    }
+  }
+
+  private void processList(int index, List<String> value, String[][] valuesBuffer) {
+    if (value == null) {
+      valuesBuffer[index] = EMPTY_STRINGS;
+    } else {
+      int numValues = value.size();
+      String[] values = new String[numValues];
+      for (int j = 0; j < numValues; j++) {
+        values[j] = value.get(j);
+      }
+      valuesBuffer[index] = values;
+    }
+  }
+
+  private void throwPathNotFoundException() {
+    throw new IllegalArgumentException("Illegal Json Path: " + _jsonPath.getPath() + " does not match document");
+  }
+
+  private void throwPathNotFoundException(Exception e) {
+    throw new IllegalArgumentException("Illegal Json Path: " + _jsonPath.getPath() + " does not match document", e);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
index f7017ea..7bce9dd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.DataBlockCache;
 import org.apache.pinot.core.operator.docvalsets.ProjectionBlockValSet;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
 
 
 /**
@@ -69,4 +70,114 @@ public class ProjectionBlock implements Block {
   public BlockMetadata getMetadata() {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Pushes a {@see TransformEvaluator} which will produce an int value down
+   * to be evaluated against the column. This is an unstable API.
+   * @param column column to evaluate against
+   * @param evaluator the evaluator which produces values from the storage in the column
+   * @param buffer the buffer to write outputs into
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, int[] buffer) {
+    _dataBlockCache.fillValues(column, evaluator, buffer);
+  }
+
+  /**
+   * Pushes a {@see TransformEvaluator} which will produce a long value down
+   * to be evaluated against the column. This is an unstable API.
+   * @param column column to evaluate against
+   * @param evaluator the evaluator which produces values from the storage in the column
+   * @param buffer the buffer to write outputs into
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, long[] buffer) {
+    _dataBlockCache.fillValues(column, evaluator, buffer);
+  }
+
+  /**
+   * Pushes a {@see TransformEvaluator} which will produce a float value down
+   * to be evaluated against the column. This is an unstable API.
+   * @param column column to evaluate against
+   * @param evaluator the evaluator which produces values from the storage in the column
+   * @param buffer the buffer to write outputs into
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, float[] buffer) {
+    _dataBlockCache.fillValues(column, evaluator, buffer);
+  }
+
+  /**
+   * Pushes a {@see TransformEvaluator} which will produce a double value down
+   * to be evaluated against the column. This is an unstable API.
+   * @param column column to evaluate against
+   * @param evaluator the evaluator which produces values from the storage in the column
+   * @param buffer the buffer to write outputs into
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, double[] buffer) {
+    _dataBlockCache.fillValues(column, evaluator, buffer);
+  }
+
+  /**
+   * Pushes a {@see TransformEvaluator} which will produce a String value down
+   * to be evaluated against the column. This is an unstable API.
+   * @param column column to evaluate against
+   * @param evaluator the evaluator which produces values from the storage in the column
+   * @param buffer the buffer to write outputs into
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, String[] buffer) {
+    _dataBlockCache.fillValues(column, evaluator, buffer);
+  }
+
+  /**
+   * Pushes a {@see TransformEvaluator} which will produce an int[] array value down
+   * to be evaluated against the column. This is an unstable API.
+   * @param column column to evaluate against
+   * @param evaluator the evaluator which produces values from the storage in the column
+   * @param buffer the buffer to write outputs into
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, int[][] buffer) {
+    _dataBlockCache.fillValues(column, evaluator, buffer);
+  }
+
+  /**
+   * Pushes a {@see TransformEvaluator} which will produce a long[] value down
+   * to be evaluated against the column. This is an unstable API.
+   * @param column column to evaluate against
+   * @param evaluator the evaluator which produces values from the storage in the column
+   * @param buffer the buffer to write outputs into
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, long[][] buffer) {
+    _dataBlockCache.fillValues(column, evaluator, buffer);
+  }
+
+  /**
+   * Pushes a {@see TransformEvaluator} which will produce a float[] value down
+   * to be evaluated against the column. This is an unstable API.
+   * @param column column to evaluate against
+   * @param evaluator the evaluator which produces values from the storage in the column
+   * @param buffer the buffer to write outputs into
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, float[][] buffer) {
+    _dataBlockCache.fillValues(column, evaluator, buffer);
+  }
+
+  /**
+   * Pushes a {@see TransformEvaluator} which will produce a double[] value down
+   * to be evaluated against the column. This is an unstable API.
+   * @param column column to evaluate against
+   * @param evaluator the evaluator which produces values from the storage in the column
+   * @param buffer the buffer to write outputs into
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, double[][] buffer) {
+    _dataBlockCache.fillValues(column, evaluator, buffer);
+  }
+
+  /**
+   * Pushes a {@see TransformEvaluator} which will produce a String[] value down
+   * to be evaluated against the column. This is an unstable API.
+   * @param column column to evaluate against
+   * @param evaluator the evaluator which produces values from the storage in the column
+   * @param buffer the buffer to write outputs into
+   */
+  public void fillValues(String column, TransformEvaluator evaluator, String[][] buffer) {
+    _dataBlockCache.fillValues(column, evaluator, buffer);
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java
index 39c45dd..eb48436 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IdentifierTransformFunction.java
@@ -24,6 +24,7 @@ import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 
 
@@ -31,7 +32,7 @@ import org.apache.pinot.segment.spi.index.reader.Dictionary;
  * The <code>IdentifierTransformFunction</code> class is a special transform function which is a wrapper on top of an
  * IDENTIFIER (column), and directly return the column value without any transformation.
  */
-public class IdentifierTransformFunction implements TransformFunction {
+public class IdentifierTransformFunction implements TransformFunction, PushDownTransformFunction {
   private final String _columnName;
   private final Dictionary _dictionary;
   private final TransformResultMetadata _resultMetadata;
@@ -128,4 +129,59 @@ public class IdentifierTransformFunction implements TransformFunction {
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
     return projectionBlock.getBlockValueSet(_columnName).getStringValuesMV();
   }
+
+  @Override
+  public void transformToIntValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, int[] buffer) {
+    projectionBlock.fillValues(_columnName, evaluator, buffer);
+  }
+
+  @Override
+  public void transformToLongValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, long[] buffer) {
+    projectionBlock.fillValues(_columnName, evaluator, buffer);
+  }
+
+  @Override
+  public void transformToFloatValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, float[] buffer) {
+    projectionBlock.fillValues(_columnName, evaluator, buffer);
+  }
+
+  @Override
+  public void transformToDoubleValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator,
+      double[] buffer) {
+    projectionBlock.fillValues(_columnName, evaluator, buffer);
+  }
+
+  @Override
+  public void transformToStringValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator,
+      String[] buffer) {
+    projectionBlock.fillValues(_columnName, evaluator, buffer);
+  }
+
+  @Override
+  public void transformToIntValuesMV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, int[][] buffer) {
+    projectionBlock.fillValues(_columnName, evaluator, buffer);
+  }
+
+  @Override
+  public void transformToLongValuesMV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, long[][] buffer) {
+    projectionBlock.fillValues(_columnName, evaluator, buffer);
+  }
+
+  @Override
+  public void transformToFloatValuesMV(ProjectionBlock projectionBlock, TransformEvaluator evaluator,
+      float[][] buffer) {
+    projectionBlock.fillValues(_columnName, evaluator, buffer);
+  }
+
+  @Override
+  public void transformToDoubleValuesMV(ProjectionBlock projectionBlock, TransformEvaluator evaluator,
+      double[][] buffer) {
+    projectionBlock.fillValues(_columnName, evaluator, buffer);
+  }
+
+  @Override
+  public void transformToStringValuesMV(ProjectionBlock projectionBlock, TransformEvaluator evaluator,
+      String[][] buffer) {
+    projectionBlock.fillValues(_columnName, evaluator, buffer);
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
index 014e9cb..7bcff9b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
@@ -29,8 +29,9 @@ import java.util.Map;
 import org.apache.pinot.common.function.JsonPathCache;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
-import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluator;
+import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluators;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.JsonUtils;
 
@@ -59,7 +60,8 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
   private TransformFunction _jsonFieldTransformFunction;
   private String _jsonPathString;
   private JsonPath _jsonPath;
-  private Object _defaultValue = null;
+  private Object _defaultValue;
+  private JsonPathEvaluator _jsonPathEvaluator;
   private TransformResultMetadata _resultMetadata;
 
   @Override
@@ -84,7 +86,6 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
     }
     _jsonFieldTransformFunction = firstArgument;
     _jsonPathString = ((LiteralTransformFunction) arguments.get(1)).getLiteral();
-    _jsonPath = JsonPathCache.INSTANCE.getOrCompute(_jsonPathString);
     String resultsType = ((LiteralTransformFunction) arguments.get(2)).getLiteral().toUpperCase();
     boolean isSingleValue = !resultsType.endsWith("_ARRAY");
     try {
@@ -94,6 +95,7 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
         _defaultValue = dataType.convert(((LiteralTransformFunction) arguments.get(3)).getLiteral());
       }
       _resultMetadata = new TransformResultMetadata(dataType, isSingleValue, false);
+      _jsonPathEvaluator = JsonPathEvaluators.create(_jsonPathString, _defaultValue);
     } catch (Exception e) {
       throw new IllegalStateException(String.format(
           "Unsupported results type: %s for jsonExtractScalar function. Supported types are: "
@@ -109,10 +111,22 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    if (_intValuesSV == null) {
-      _intValuesSV = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesSV == null || _intValuesSV.length < numDocs) {
+      _intValuesSV = new int[numDocs];
+    }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToIntValuesSV(projectionBlock, _jsonPathEvaluator, _intValuesSV);
+      return _intValuesSV;
     }
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    return transformTransformedValuesToIntValuesSV(projectionBlock);
+  }
 
+  private int[] transformTransformedValuesToIntValuesSV(ProjectionBlock projectionBlock) {
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    ensureJsonPathCompiled();
     String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     int numDocs = projectionBlock.getNumDocs();
     for (int i = 0; i < numDocs; i++) {
@@ -140,10 +154,21 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    if (_longValuesSV == null) {
-      _longValuesSV = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesSV == null || _longValuesSV.length < numDocs) {
+      _longValuesSV = new long[numDocs];
+    }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToLongValuesSV(projectionBlock, _jsonPathEvaluator, _longValuesSV);
+      return _longValuesSV;
     }
+    return transformTransformedValuesToLongValuesSV(projectionBlock);
+  }
 
+  private long[] transformTransformedValuesToLongValuesSV(ProjectionBlock projectionBlock) {
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    ensureJsonPathCompiled();
     String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     int numDocs = projectionBlock.getNumDocs();
     for (int i = 0; i < numDocs; i++) {
@@ -172,10 +197,21 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    if (_floatValuesSV == null) {
-      _floatValuesSV = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesSV == null || _floatValuesSV.length < numDocs) {
+      _floatValuesSV = new float[numDocs];
+    }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToFloatValuesSV(projectionBlock, _jsonPathEvaluator, _floatValuesSV);
+      return _floatValuesSV;
     }
+    return transformTransformedValuesToFloatValuesSV(projectionBlock);
+  }
 
+  private float[] transformTransformedValuesToFloatValuesSV(ProjectionBlock projectionBlock) {
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    ensureJsonPathCompiled();
     String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     int numDocs = projectionBlock.getNumDocs();
     for (int i = 0; i < numDocs; i++) {
@@ -203,10 +239,21 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    if (_doubleValuesSV == null) {
-      _doubleValuesSV = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesSV == null || _doubleValuesSV.length < numDocs) {
+      _doubleValuesSV = new double[numDocs];
+    }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToDoubleValuesSV(projectionBlock, _jsonPathEvaluator, _doubleValuesSV);
+      return _doubleValuesSV;
     }
+    return transformTransformedValuesToDoubleValuesSV(projectionBlock);
+  }
 
+  private double[] transformTransformedValuesToDoubleValuesSV(ProjectionBlock projectionBlock) {
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    ensureJsonPathCompiled();
     String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     int numDocs = projectionBlock.getNumDocs();
     for (int i = 0; i < numDocs; i++) {
@@ -234,10 +281,21 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
 
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
-    if (_stringValuesSV == null) {
-      _stringValuesSV = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesSV == null || _stringValuesSV.length < numDocs) {
+      _stringValuesSV = new String[numDocs];
+    }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToStringValuesSV(projectionBlock, _jsonPathEvaluator, _stringValuesSV);
+      return _stringValuesSV;
     }
+    return transformTransformedValuesToStringValuesSV(projectionBlock);
+  }
 
+  private String[] transformTransformedValuesToStringValuesSV(ProjectionBlock projectionBlock) {
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    ensureJsonPathCompiled();
     String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     int numDocs = projectionBlock.getNumDocs();
     for (int i = 0; i < numDocs; i++) {
@@ -265,10 +323,22 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
 
   @Override
   public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
-    if (_intValuesMV == null) {
-      _intValuesMV = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesMV == null || _intValuesMV.length < numDocs) {
+      _intValuesMV = new int[numDocs][];
     }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToIntValuesMV(projectionBlock, _jsonPathEvaluator, _intValuesMV);
+      return _intValuesMV;
+    }
+
+    return transformTransformedValuesToIntValuesMV(projectionBlock);
+  }
 
+  private int[][] transformTransformedValuesToIntValuesMV(ProjectionBlock projectionBlock) {
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    ensureJsonPathCompiled();
     String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     int numDocs = projectionBlock.getNumDocs();
     for (int i = 0; i < numDocs; i++) {
@@ -293,10 +363,22 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
 
   @Override
   public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
-    if (_longValuesMV == null) {
-      _longValuesMV = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesMV == null || _longValuesMV.length < numDocs) {
+      _longValuesMV = new long[numDocs][];
+    }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToLongValuesMV(projectionBlock, _jsonPathEvaluator, _longValuesMV);
+      return _longValuesMV;
     }
 
+    return transformTransformedValuesToLongValuesMV(projectionBlock);
+  }
+
+  private long[][] transformTransformedValuesToLongValuesMV(ProjectionBlock projectionBlock) {
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    ensureJsonPathCompiled();
     String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     int length = projectionBlock.getNumDocs();
     for (int i = 0; i < length; i++) {
@@ -321,10 +403,22 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
 
   @Override
   public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
-    if (_floatValuesMV == null) {
-      _floatValuesMV = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesMV == null || _floatValuesMV.length < numDocs) {
+      _floatValuesMV = new float[numDocs][];
     }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToFloatValuesMV(projectionBlock, _jsonPathEvaluator, _floatValuesMV);
+      return _floatValuesMV;
+    }
+
+    return transformTransformedValuesToFloatValuesMV(projectionBlock);
+  }
 
+  private float[][] transformTransformedValuesToFloatValuesMV(ProjectionBlock projectionBlock) {
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    ensureJsonPathCompiled();
     String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     int length = projectionBlock.getNumDocs();
     for (int i = 0; i < length; i++) {
@@ -349,8 +443,30 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
 
   @Override
   public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) {
-    if (_doubleValuesMV == null) {
-      _doubleValuesMV = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+      _doubleValuesMV = new double[numDocs][];
+    }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToDoubleValuesMV(projectionBlock, _jsonPathEvaluator, _doubleValuesMV);
+      return _doubleValuesMV;
+    }
+
+    return transformTransformedToDoubleValuesMV(projectionBlock);
+  }
+
+  private double[][] transformTransformedToDoubleValuesMV(ProjectionBlock projectionBlock) {
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    ensureJsonPathCompiled();
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+      _doubleValuesMV = new double[numDocs][];
+    }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToDoubleValuesMV(projectionBlock, _jsonPathEvaluator, _doubleValuesMV);
+      return _doubleValuesMV;
     }
 
     String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
@@ -377,10 +493,22 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
 
   @Override
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
-    if (_stringValuesMV == null) {
-      _stringValuesMV = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesMV == null || _stringValuesMV.length < numDocs) {
+      _stringValuesMV = new String[numDocs][];
+    }
+    if (_jsonFieldTransformFunction instanceof PushDownTransformFunction) {
+      ((PushDownTransformFunction) _jsonFieldTransformFunction)
+          .transformToStringValuesMV(projectionBlock, _jsonPathEvaluator, _stringValuesMV);
+      return _stringValuesMV;
     }
 
+    return transformTransformedValuesToStringValuesMV(projectionBlock);
+  }
+
+  private String[][] transformTransformedValuesToStringValuesMV(ProjectionBlock projectionBlock) {
+    // operating on the output of another transform so can't pass the evaluation down to the storage
+    ensureJsonPathCompiled();
     String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     int length = projectionBlock.getNumDocs();
     for (int i = 0; i < length; i++) {
@@ -402,4 +530,10 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
     }
     return _stringValuesMV;
   }
+
+  private void ensureJsonPathCompiled() {
+    if (_jsonPath == null) {
+      _jsonPath = JsonPathCache.INSTANCE.getOrCompute(_jsonPathString);
+    }
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java
new file mode 100644
index 0000000..743d715
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/PushDownTransformFunction.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
+
+
+public interface PushDownTransformFunction {
+
+  /**
+   * SINGLE-VALUED APIs
+   */
+
+  /**
+   * Transforms the data from the given projection block to single-valued int values.
+   *
+   * @param projectionBlock Projection result
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  void transformToIntValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, int[] buffer);
+
+  /**
+   * Transforms the data from the given projection block to single-valued long values.
+   *
+   * @param projectionBlock Projection result
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  void transformToLongValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, long[] buffer);
+
+  /**
+   * Transforms the data from the given projection block to single-valued float values.
+   *
+   * @param projectionBlock Projection result
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  void transformToFloatValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, float[] buffer);
+
+  /**
+   * Transforms the data from the given projection block to single-valued double values.
+   *
+   * @param projectionBlock Projection result
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  void transformToDoubleValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, double[] buffer);
+
+  /**
+   * Transforms the data from the given projection block to single-valued string values.
+   *
+   * @param projectionBlock Projection result
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  void transformToStringValuesSV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, String[] buffer);
+
+  /**
+   * MULTI-VALUED APIs
+   */
+
+  /**
+   * Transforms the data from the given projection block to multi-valued int values.
+   *
+   * @param projectionBlock Projection result
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  void transformToIntValuesMV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, int[][] buffer);
+
+  /**
+   * Transforms the data from the given projection block to multi-valued long values.
+   *
+   * @param projectionBlock Projection result
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  void transformToLongValuesMV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, long[][] buffer);
+
+  /**
+   * Transforms the data from the given projection block to multi-valued float values.
+   *
+   * @param projectionBlock Projection result
+   * @param jsonPath transform
+   * @param buffer values to fill
+   */
+  void transformToFloatValuesMV(ProjectionBlock projectionBlock, TransformEvaluator jsonPath, float[][] buffer);
+
+  /**
+   * Transforms the data from the given projection block to multi-valued double values.
+   *
+   * @param projectionBlock Projection result
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  void transformToDoubleValuesMV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, double[][] buffer);
+
+  /**
+   * Transforms the data from the given projection block to multi-valued string values.
+   *
+   * @param projectionBlock Projection result
+   * @param evaluator transform evaluator
+   * @param buffer values to fill
+   */
+  void transformToStringValuesMV(ProjectionBlock projectionBlock, TransformEvaluator evaluator, String[][] buffer);
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseJsonQueryTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseJsonQueryTest.java
new file mode 100644
index 0000000..5295412
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseJsonQueryTest.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+
+
+public abstract class BaseJsonQueryTest extends BaseQueriesTest {
+
+  static final String RAW_TABLE_NAME = "testTable";
+  static final String SEGMENT_NAME = "testSegment";
+
+  static final String INT_COLUMN = "intColumn";
+  static final String LONG_COLUMN = "longColumn";
+  static final String STRING_COLUMN = "stringColumn";
+  static final String JSON_COLUMN = "jsonColumn";
+  static final String RAW_JSON_COLUMN = "rawJsonColumn";
+  static final String RAW_BYTES_COLUMN = "rawBytesColumn";
+  static final String DICTIONARY_BYTES_COLUMN = "dictionaryBytesColumn";
+  static final String RAW_STRING_COLUMN = "rawStringColumn";
+  static final String DICTIONARY_STRING_COLUMN = "dictionaryStringColumn";
+  static final String JSON_COLUMN_WITHOUT_INDEX = "jsonColumnWithoutIndex";
+
+  @DataProvider
+  public static Object[][] allJsonColumns() {
+    return new Object[][]{
+        {JSON_COLUMN},
+        {RAW_JSON_COLUMN},
+        {JSON_COLUMN_WITHOUT_INDEX},
+        {RAW_BYTES_COLUMN},
+        {DICTIONARY_BYTES_COLUMN},
+        {RAW_STRING_COLUMN},
+        {DICTIONARY_STRING_COLUMN},
+    };
+  }
+
+  @DataProvider
+  public static Object[][] nativeJsonColumns() {
+    return new Object[][]{
+        {JSON_COLUMN},
+        {RAW_JSON_COLUMN},
+        {JSON_COLUMN_WITHOUT_INDEX},
+    };
+  }
+
+  @DataProvider
+  public static Object[][] nonNativeJsonColumns() {
+    // columns where we should be able to extract JSON with a function, but can't use all the literal features
+    return new Object[][]{
+        {RAW_BYTES_COLUMN},
+        {DICTIONARY_BYTES_COLUMN},
+        {RAW_STRING_COLUMN},
+        {DICTIONARY_STRING_COLUMN},
+    };
+  }
+
+  protected IndexSegment _indexSegment;
+  protected List<IndexSegment> _indexSegments;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    File indexDir = indexDir();
+    FileUtils.deleteDirectory(indexDir);
+
+    TableConfig tableConfig = tableConfig();
+
+    List<GenericRow> records = new ArrayList<>(numRecords());
+    records.add(createRecord(1, 1, "daffy duck",
+        "{\"name\": {\"first\": \"daffy\", \"last\": \"duck\"}, \"id\": 101, \"data\": [\"a\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(2, 2, "mickey mouse",
+        "{\"name\": {\"first\": \"mickey\", \"last\": \"mouse\"}, \"id\": 111, \"data\": [\"e\", \"b\", \"c\", "
+            + "\"d\"]}"));
+    records.add(createRecord(3, 3, "donald duck",
+        "{\"name\": {\"first\": \"donald\", \"last\": \"duck\"}, \"id\": 121, \"data\": [\"f\", \"b\", \"c\", "
+            + "\"d\"]}"));
+    records.add(createRecord(4, 4, "scrooge mcduck",
+        "{\"name\": {\"first\": \"scrooge\", \"last\": \"mcduck\"}, \"id\": 131, \"data\": [\"g\", \"b\", \"c\", "
+            + "\"d\"]}"));
+    records.add(createRecord(5, 5, "minnie mouse",
+        "{\"name\": {\"first\": \"minnie\", \"last\": \"mouse\"}, \"id\": 141, \"data\": [\"h\", \"b\", \"c\", "
+            + "\"d\"]}"));
+    records.add(createRecord(6, 6, "daisy duck",
+        "{\"name\": {\"first\": \"daisy\", \"last\": \"duck\"}, \"id\": 161.5, \"data\": [\"i\", \"b\", \"c\", "
+            + "\"d\"]}"));
+    records.add(createRecord(7, 7, "pluto dog",
+        "{\"name\": {\"first\": \"pluto\", \"last\": \"dog\"}, \"id\": 161, \"data\": [\"j\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(8, 8, "goofy dwag",
+        "{\"name\": {\"first\": \"goofy\", \"last\": \"dwag\"}, \"id\": 171, \"data\": [\"k\", \"b\", \"c\", \"d\"]}"));
+    records.add(createRecord(9, 9, "ludwik von drake",
+        "{\"name\": {\"first\": \"ludwik\", \"last\": \"von drake\"}, \"id\": 181, \"data\": [\"l\", \"b\", \"c\", "
+            + "\"d\"]}"));
+    records.add(createRecord(10, 10, "nested array",
+        "{\"name\":{\"first\":\"nested\",\"last\":\"array\"},\"id\":111,\"data\":[{\"e\":[{\"x\":[{\"i1\":1,"
+            + "\"i2\":2}]},{\"y\":[{\"i1\":1,\"i2\":2}]},{\"z\":[{\"i1\":1,\"i2\":2}]}]},{\"b\":[{\"x\":[{\"i1\":1,"
+            + "\"i2\":2}]},{\"y\":[{\"i1\":1,\"i2\":2}]},{\"z\":[{\"i1\":10,\"i2\":20}]}]}]}"));
+    records.add(createRecord(11, 11, "multi-dimensional-1 array",
+        "{\"name\": {\"first\": \"multi-dimensional-1\",\"last\": \"array\"},\"id\": 111,\"data\": [[[1,2],[3,4]],"
+            + "[[\"a\",\"b\"],[\"c\",\"d\"]]]}"));
+    records.add(createRecord(12, 12, "multi-dimensional-2 array",
+        "{\"name\": {\"first\": \"multi-dimensional-2\",\"last\": \"array\"},\"id\": 111,\"data\": [[[1,2],[3,4]],"
+            + "[[\"a\",\"b\"],[\"c\",\"d\"]]]}"));
+    records.add(createRecord(13, 13, "multi-dimensional-1 array",
+        "{\"name\": {\"first\": \"multi-dimensional-1\",\"last\": \"array\"},\"id\": 111,\"data\": [[[1,2],[3,4]],"
+            + "[[\"a\",\"b\"],[\"c\",\"d\"]]]}"));
+    records.add(createRecord(13, 13, "days",
+        "{\"name\": {\"first\": \"multi-dimensional-1\",\"last\": \"array\"},\"days\": 111}"));
+    records.add(createRecord(14, 14, "top level array", "[{\"i1\":1,\"i2\":2}, {\"i1\":3,\"i2\":4}]"));
+
+    List<String> jsonIndexColumns = new ArrayList<>();
+    jsonIndexColumns.add("jsonColumn");
+    tableConfig.getIndexingConfig().setJsonIndexColumns(jsonIndexColumns);
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema());
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(indexDir.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setTableConfig(tableConfig);
+    indexLoadingConfig.setJsonIndexColumns(new HashSet<>(jsonIndexColumns));
+    indexLoadingConfig.setReadMode(ReadMode.mmap);
+
+    ImmutableSegment immutableSegment =
+        ImmutableSegmentLoader.load(new File(indexDir, SEGMENT_NAME), indexLoadingConfig);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  protected void checkResult(String query, Object[][] expecteds) {
+    BrokerResponseNative response1 = getBrokerResponseForOptimizedSqlQuery(query, tableConfig(), schema());
+    List<Object[]> rows = response1.getResultTable().getRows();
+
+    Assert.assertEquals(rows.size(), expecteds.length);
+    for (int i = 0; i < rows.size(); i++) {
+      Object[] actual = rows.get(i);
+      Object[] expected = expecteds[i];
+      Assert.assertEquals(actual, expected);
+    }
+  }
+
+  int numRecords() {
+    return 10;
+  }
+
+  abstract TableConfig tableConfig();
+
+  abstract Schema schema();
+
+  File indexDir() {
+    return new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
+  }
+
+  abstract GenericRow createRecord(int intValue, long longValue, String stringValue, String jsonValue);
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/JsonExtractScalarTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/JsonExtractScalarTest.java
new file mode 100644
index 0000000..7ab33ff
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/JsonExtractScalarTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+
+public class JsonExtractScalarTest extends BaseJsonQueryTest {
+
+  private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+      .addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
+      .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
+      .addSingleValueDimension(JSON_COLUMN, FieldSpec.DataType.JSON)
+      .addSingleValueDimension(RAW_JSON_COLUMN, FieldSpec.DataType.JSON)
+      .addSingleValueDimension(RAW_BYTES_COLUMN, FieldSpec.DataType.BYTES)
+      .addSingleValueDimension(RAW_STRING_COLUMN, FieldSpec.DataType.STRING)
+      .addSingleValueDimension(DICTIONARY_BYTES_COLUMN, FieldSpec.DataType.BYTES)
+      .addSingleValueDimension(DICTIONARY_STRING_COLUMN, FieldSpec.DataType.STRING)
+      .addSingleValueDimension(JSON_COLUMN_WITHOUT_INDEX, FieldSpec.DataType.JSON).build();
+
+  private static final FieldConfig RAW_JSON_COLUMN_CONFIG = new FieldConfig(RAW_JSON_COLUMN,
+      FieldConfig.EncodingType.RAW, ImmutableList.of(), FieldConfig.CompressionCodec.LZ4, ImmutableMap.of());
+  private static final FieldConfig RAW_BYTES_COLUMN_CONFIG = new FieldConfig(RAW_BYTES_COLUMN,
+      FieldConfig.EncodingType.RAW, ImmutableList.of(), FieldConfig.CompressionCodec.LZ4, ImmutableMap.of());
+  private static final FieldConfig RAW_STRING_COLUMN_CONFIG = new FieldConfig(RAW_STRING_COLUMN,
+      FieldConfig.EncodingType.RAW, ImmutableList.of(), FieldConfig.CompressionCodec.LZ4, ImmutableMap.of());
+
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+          .setFieldConfigList(
+              ImmutableList.of(RAW_JSON_COLUMN_CONFIG, RAW_BYTES_COLUMN_CONFIG, RAW_STRING_COLUMN_CONFIG))
+          .build();
+
+  GenericRow createRecord(int intValue, long longValue, String stringValue, String jsonValue) {
+    GenericRow record = new GenericRow();
+    record.putValue(INT_COLUMN, intValue);
+    record.putValue(LONG_COLUMN, longValue);
+    record.putValue(STRING_COLUMN, stringValue);
+    record.putValue(JSON_COLUMN, jsonValue);
+    record.putValue(RAW_JSON_COLUMN, jsonValue);
+    record.putValue(JSON_COLUMN_WITHOUT_INDEX, jsonValue);
+    record.putValue(RAW_BYTES_COLUMN, jsonValue.getBytes(StandardCharsets.UTF_8));
+    record.putValue(DICTIONARY_BYTES_COLUMN, jsonValue.getBytes(StandardCharsets.UTF_8));
+    record.putValue(RAW_STRING_COLUMN, jsonValue);
+    record.putValue(DICTIONARY_STRING_COLUMN, jsonValue);
+    return record;
+  }
+
+  @Override
+  TableConfig tableConfig() {
+    return TABLE_CONFIG;
+  }
+
+  @Override
+  Schema schema() {
+    return SCHEMA;
+  }
+
+  @Test(dataProvider = "allJsonColumns")
+  public void testExtractJsonField(String column) {
+    Object[][] expecteds1 = {{"duck"}, {"mouse"}, {"duck"}};
+    checkResult("SELECT jsonextractscalar(" + column + ", '$.name.last', 'STRING') FROM testTable LIMIT 3", expecteds1);
+  }
+
+  @Test(dataProvider = "allJsonColumns")
+  public void testNestedExtractJsonField(String column) {
+    Object[][] expecteds1 = {{"duck"}, {"mouse"}, {"duck"}};
+    checkResult("SELECT jsonextractscalar(jsonextractscalar(" + column
+        + ", '$.name', 'STRING'), '$.last', 'STRING') FROM testTable LIMIT 3", expecteds1);
+  }
+
+  @Test(dataProvider = "nativeJsonColumns")
+  public void testJsonSelect(String column) {
+    // SELECT using a simple json path expression.
+    Object[][] expecteds1 = {{"duck"}, {"mouse"}, {"duck"}};
+    checkResult("SELECT " + column + ".name.last FROM testTable LIMIT 3", expecteds1);
+
+    Object[][] expecteds2 =
+        {{"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"1"}};
+    checkResult("SELECT " + column + ".data[0].e[2].z[0].i1 FROM testTable", expecteds2);
+  }
+
+  /** Test that a json path expression in GROUP BY clause is properly converted into a JSON_EXTRACT_SCALAR function. */
+  @Test(dataProvider = "nativeJsonColumns")
+  public void testJsonGroupBy(String column) {
+    Object[][] expecteds1 =
+        {
+            {"111", 20L}, {"101", 4L}, {"null", 8L}, {"181", 4L}, {"161.5", 4L}, {"171", 4L}, {"161", 4L}, {"141", 4L},
+            {"131", 4L}, {"121", 4L}
+        };
+    checkResult("SELECT " + column + ".id, count(*) FROM testTable GROUP BY " + column + ".id", expecteds1);
+  }
+
+  /** Test that a json path expression in HAVING clause is properly converted into a JSON_EXTRACT_SCALAR function. */
+  @Test(dataProvider = "nativeJsonColumns")
+  public void testJsonGroupByHaving(String column) {
+    Object[][] expecteds1 = {{"mouse", 8L}};
+    checkResult(
+        "SELECT " + column + ".name.last, count(*) FROM testTable GROUP BY " + column + ".name.last HAVING " + column
+            + ".name"
+            + ".last = 'mouse'", expecteds1);
+  }
+
+  /** Test a complex SQL statement with json path expression in SELECT, WHERE, and GROUP BY clauses. */
+  @Test(dataProvider = "nativeJsonColumns")
+  public void testJsonSelectFilterGroupBy(String column) {
+    Object[][] expecteds1 = {{"duck", 4L}};
+    checkResult(
+        "SELECT " + column + ".name.last, count(*) FROM testTable WHERE " + column + ".id = 101 GROUP BY " + column
+            + ".name.last",
+        expecteds1);
+  }
+
+  /** Test a numerical function over json path expression in SELECT clause. */
+  @Test(dataProvider = "nativeJsonColumns")
+  public void testNumericalFunctionOverJsonPathSelectExpression(String column) {
+
+    // Test without user-specified alias.
+    Object[][] expecteds1 = {{181.0}};
+    checkResult("SELECT MAX(" + column + ".id) FROM testTable", expecteds1);
+
+    // Test with user-specified alias.
+    Object[][] expecteds2 = {{181.0}};
+    checkResult("SELECT MAX(" + column + ".id) AS x FROM testTable", expecteds2);
+
+    // Test with nested function calls (minus function being used within max function).
+    Object[][] expecteds3 = {{176.0}};
+    checkResult("SELECT MAX(" + column + ".id - 5) FROM testTable", expecteds3);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/JsonPathQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/JsonPathQueriesTest.java
index 097e6d9..036c237 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/JsonPathQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/JsonPathQueriesTest.java
@@ -18,44 +18,20 @@
  */
 package org.apache.pinot.queries;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
-import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
-public class JsonPathQueriesTest extends BaseQueriesTest {
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "JsonDataTypeQueriesTest");
-  private static final String RAW_TABLE_NAME = "testTable";
-  private static final String SEGMENT_NAME = "testSegment";
-  private static final int NUM_RECORDS = 10;
-
-  private static final String INT_COLUMN = "intColumn";
-  private static final String LONG_COLUMN = "longColumn";
-  private static final String STRING_COLUMN = "stringColumn";
-  private static final String JSON_COLUMN = "jsonColumn";
-  private static final String JSON_COLUMN_WITHOUT_INDEX = "jsonColumnWithoutIndex";
+public class JsonPathQueriesTest extends BaseJsonQueryTest {
 
   private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
       .addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
@@ -67,9 +43,6 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
   private static final TableConfig TABLE_CONFIG =
       new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
 
-  private IndexSegment _indexSegment;
-  private List<IndexSegment> _indexSegments;
-
   @Override
   protected String getFilter() {
     return "";
@@ -96,86 +69,14 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
     return record;
   }
 
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    FileUtils.deleteDirectory(INDEX_DIR);
-
-    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
-    records.add(createRecord(1, 1, "daffy duck",
-        "{\"name\": {\"first\": \"daffy\", \"last\": \"duck\"}, \"id\": 101, \"data\": [\"a\", \"b\", \"c\", \"d\"]}"));
-    records.add(createRecord(2, 2, "mickey mouse",
-        "{\"name\": {\"first\": \"mickey\", \"last\": \"mouse\"}, \"id\": 111, \"data\": [\"e\", \"b\", \"c\", "
-            + "\"d\"]}"));
-    records.add(createRecord(3, 3, "donald duck",
-        "{\"name\": {\"first\": \"donald\", \"last\": \"duck\"}, \"id\": 121, \"data\": [\"f\", \"b\", \"c\", "
-            + "\"d\"]}"));
-    records.add(createRecord(4, 4, "scrooge mcduck",
-        "{\"name\": {\"first\": \"scrooge\", \"last\": \"mcduck\"}, \"id\": 131, \"data\": [\"g\", \"b\", \"c\", "
-            + "\"d\"]}"));
-    records.add(createRecord(5, 5, "minnie mouse",
-        "{\"name\": {\"first\": \"minnie\", \"last\": \"mouse\"}, \"id\": 141, \"data\": [\"h\", \"b\", \"c\", "
-            + "\"d\"]}"));
-    records.add(createRecord(6, 6, "daisy duck",
-        "{\"name\": {\"first\": \"daisy\", \"last\": \"duck\"}, \"id\": 161.5, \"data\": [\"i\", \"b\", \"c\", "
-            + "\"d\"]}"));
-    records.add(createRecord(7, 7, "pluto dog",
-        "{\"name\": {\"first\": \"pluto\", \"last\": \"dog\"}, \"id\": 161, \"data\": [\"j\", \"b\", \"c\", \"d\"]}"));
-    records.add(createRecord(8, 8, "goofy dwag",
-        "{\"name\": {\"first\": \"goofy\", \"last\": \"dwag\"}, \"id\": 171, \"data\": [\"k\", \"b\", \"c\", \"d\"]}"));
-    records.add(createRecord(9, 9, "ludwik von drake",
-        "{\"name\": {\"first\": \"ludwik\", \"last\": \"von drake\"}, \"id\": 181, \"data\": [\"l\", \"b\", \"c\", "
-            + "\"d\"]}"));
-    records.add(createRecord(10, 10, "nested array",
-        "{\"name\":{\"first\":\"nested\",\"last\":\"array\"},\"id\":111,\"data\":[{\"e\":[{\"x\":[{\"i1\":1,"
-            + "\"i2\":2}]},{\"y\":[{\"i1\":1,\"i2\":2}]},{\"z\":[{\"i1\":1,\"i2\":2}]}]},{\"b\":[{\"x\":[{\"i1\":1,"
-            + "\"i2\":2}]},{\"y\":[{\"i1\":1,\"i2\":2}]},{\"z\":[{\"i1\":10,\"i2\":20}]}]}]}"));
-    records.add(createRecord(11, 11, "multi-dimensional-1 array",
-        "{\"name\": {\"first\": \"multi-dimensional-1\",\"last\": \"array\"},\"id\": 111,\"data\": [[[1,2],[3,4]],"
-            + "[[\"a\",\"b\"],[\"c\",\"d\"]]]}"));
-    records.add(createRecord(12, 12, "multi-dimensional-2 array",
-        "{\"name\": {\"first\": \"multi-dimensional-2\",\"last\": \"array\"},\"id\": 111,\"data\": [[[1,2],[3,4]],"
-            + "[[\"a\",\"b\"],[\"c\",\"d\"]]]}"));
-    records.add(createRecord(13, 13, "multi-dimensional-1 array",
-        "{\"name\": {\"first\": \"multi-dimensional-1\",\"last\": \"array\"},\"id\": 111,\"data\": [[[1,2],[3,4]],"
-            + "[[\"a\",\"b\"],[\"c\",\"d\"]]]}"));
-    records.add(createRecord(13, 13, "days",
-        "{\"name\": {\"first\": \"multi-dimensional-1\",\"last\": \"array\"},\"days\": 111}"));
-    records.add(createRecord(14, 14, "top level array", "[{\"i1\":1,\"i2\":2}, {\"i1\":3,\"i2\":4}]"));
-
-    List<String> jsonIndexColumns = new ArrayList<>();
-    jsonIndexColumns.add("jsonColumn");
-    TABLE_CONFIG.getIndexingConfig().setJsonIndexColumns(jsonIndexColumns);
-    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
-    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
-    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
-
-    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
-    driver.build();
-
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTableConfig(TABLE_CONFIG);
-    indexLoadingConfig.setJsonIndexColumns(new HashSet<String>(jsonIndexColumns));
-    indexLoadingConfig.setReadMode(ReadMode.mmap);
-
-    ImmutableSegment immutableSegment =
-        ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), indexLoadingConfig);
-    _indexSegment = immutableSegment;
-    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  @Override
+  TableConfig tableConfig() {
+    return TABLE_CONFIG;
   }
 
-  private void checkresult(String query, Object[][] expecteds) {
-    BrokerResponseNative response1 = getBrokerResponseForOptimizedSqlQuery(query, TABLE_CONFIG, SCHEMA);
-    List<Object[]> rows = response1.getResultTable().getRows();
-
-    Assert.assertEquals(rows.size(), expecteds.length);
-    for (int i = 0; i < rows.size(); i++) {
-      Object[] actual = rows.get(i);
-      Object[] expected = expecteds[i];
-      Assert.assertEquals(actual, expected);
-    }
+  @Override
+  Schema schema() {
+    return SCHEMA;
   }
 
   /** Test that a json path expression in SELECT list is properly converted to a JSON_EXTRACT_SCALAR function within
@@ -184,11 +85,11 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
   public void testJsonSelect() {
     // SELECT using a simple json path expression.
     Object[][] expecteds1 = {{"duck"}, {"mouse"}, {"duck"}};
-    checkresult("SELECT jsonColumn.name.last FROM testTable LIMIT 3", expecteds1);
+    checkResult("SELECT jsonColumn.name.last FROM testTable LIMIT 3", expecteds1);
 
     Object[][] expecteds2 =
         {{"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"null"}, {"1"}};
-    checkresult("SELECT jsonColumn.data[0].e[2].z[0].i1 FROM testTable", expecteds2);
+    checkResult("SELECT jsonColumn.data[0].e[2].z[0].i1 FROM testTable", expecteds2);
   }
 
   /** Test that a predicate comparing a json path expression with literal is properly converted into a JSON_MATCH
@@ -200,17 +101,17 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
         {{1, "{\"name\":{\"first\":\"daffy\",\"last\":\"duck\"},\"id\":101,\"data\":[\"a\",\"b\",\"c\",\"d\"]}",
             "{\"name\":{\"first\":\"daffy\",\"last\":\"duck\"},\"id\":101,\"data\":[\"a\",\"b\",\"c\",\"d\"]}", 1L,
             "daffy duck"}};
-    checkresult("SELECT * FROM testTable WHERE jsonColumn.name.first = 'daffy' LIMIT 1", expecteds1);
-    checkresult("SELECT * FROM testTable WHERE jsonColumnWithoutIndex.name.first = 'daffy' LIMIT 1", expecteds1);
+    checkResult("SELECT * FROM testTable WHERE jsonColumn.name.first = 'daffy' LIMIT 1", expecteds1);
+    checkResult("SELECT * FROM testTable WHERE jsonColumnWithoutIndex.name.first = 'daffy' LIMIT 1", expecteds1);
 
     // Comparing json path expression with a numerical value.
     Object[][] expecteds2 =
         {{1, "{\"name\":{\"first\":\"daffy\",\"last\":\"duck\"},\"id\":101,\"data\":[\"a\",\"b\",\"c\",\"d\"]}",
             "{\"name\":{\"first\":\"daffy\",\"last\":\"duck\"},\"id\":101,\"data\":[\"a\",\"b\",\"c\",\"d\"]}", 1L,
             "daffy duck"}};
-    checkresult("SELECT * FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.id\" = 101') LIMIT 1", expecteds2);
+    checkResult("SELECT * FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.id\" = 101') LIMIT 1", expecteds2);
     try {
-      checkresult("SELECT * FROM testTable WHERE JSON_MATCH(jsonColumnWithoutIndex, '\"$.id\" = 101') LIMIT 1",
+      checkResult("SELECT * FROM testTable WHERE JSON_MATCH(jsonColumnWithoutIndex, '\"$.id\" = 101') LIMIT 1",
           expecteds2);
       Assert.fail();
     } catch (IllegalStateException e) {
@@ -220,7 +121,7 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
 
     // Comparing json path expression with a string value.
     Object[][] expecteds3 = {{4L}};
-    checkresult("SELECT count(*) FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.id\" IS NOT NULL') AND JSON_MATCH"
+    checkResult("SELECT count(*) FROM testTable WHERE JSON_MATCH(jsonColumn, '\"$.id\" IS NOT NULL') AND JSON_MATCH"
         + "(jsonColumn, '\"$.id\" = 101')", expecteds3);
   }
 
@@ -230,8 +131,8 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
     Object[][] expecteds1 =
         {{"111", 20L}, {"101", 4L}, {"null", 8L}, {"181", 4L}, {"161.5", 4L}, {"171", 4L}, {"161", 4L}, {"141", 4L},
             {"131", 4L}, {"121", 4L}};
-    checkresult("SELECT jsonColumn.id, count(*) FROM testTable GROUP BY jsonColumn.id", expecteds1);
-    checkresult("SELECT jsonColumnWithoutIndex.id, count(*) FROM testTable GROUP BY jsonColumnWithoutIndex.id",
+    checkResult("SELECT jsonColumn.id, count(*) FROM testTable GROUP BY jsonColumn.id", expecteds1);
+    checkResult("SELECT jsonColumnWithoutIndex.id, count(*) FROM testTable GROUP BY jsonColumnWithoutIndex.id",
         expecteds1);
   }
 
@@ -239,10 +140,10 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
   @Test
   public void testJsonGroupByHaving() {
     Object[][] expecteds1 = {{"mouse", 8L}};
-    checkresult(
+    checkResult(
         "SELECT jsonColumn.name.last, count(*) FROM testTable GROUP BY jsonColumn.name.last HAVING jsonColumn.name"
             + ".last = 'mouse'", expecteds1);
-    checkresult(
+    checkResult(
         "SELECT jsonColumnWithoutIndex.name.last, count(*) FROM testTable GROUP BY jsonColumnWithoutIndex.name.last "
             + "HAVING jsonColumnWithoutIndex.name.last = 'mouse'", expecteds1);
   }
@@ -251,10 +152,10 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
   @Test
   public void testJsonSelectFilterGroupBy() {
     Object[][] expecteds1 = {{"duck", 4L}};
-    checkresult(
+    checkResult(
         "SELECT jsonColumn.name.last, count(*) FROM testTable WHERE jsonColumn.id = 101 GROUP BY jsonColumn.name.last",
         expecteds1);
-    checkresult(
+    checkResult(
         "SELECT jsonColumnWithoutIndex.name.last, count(*) FROM testTable WHERE jsonColumnWithoutIndex.id = 101 GROUP"
             + " BY jsonColumnWithoutIndex.name.last", expecteds1);
   }
@@ -264,15 +165,15 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
   public void testTransformFunctionOverJsonPathSelectExpression() {
     // Apply string transform function on json path expression.
     Object[][] expecteds1 = {{"DAFFY"}};
-    checkresult("SELECT UPPER(jsonColumn.name.first) FROM testTable LIMIT 1", expecteds1);
-    checkresult("SELECT UPPER(jsonColumnWithoutIndex.name.first) FROM testTable LIMIT 1", expecteds1);
+    checkResult("SELECT UPPER(jsonColumn.name.first) FROM testTable LIMIT 1", expecteds1);
+    checkResult("SELECT UPPER(jsonColumnWithoutIndex.name.first) FROM testTable LIMIT 1", expecteds1);
 
     // Apply date transform function on json path expression and check for IS NULL
     Object[][] expecteds2 = {{Long.MIN_VALUE}};
-    checkresult("SELECT FROMEPOCHDAYS(jsonColumn.days) FROM testTable WHERE jsonColumn.days IS NULL LIMIT 1",
+    checkResult("SELECT FROMEPOCHDAYS(jsonColumn.days) FROM testTable WHERE jsonColumn.days IS NULL LIMIT 1",
         expecteds2);
     try {
-      checkresult(
+      checkResult(
           "SELECT FROMEPOCHDAYS(jsonColumnWithoutIndex.days) FROM testTable WHERE jsonColumnWithoutIndex.days IS NULL"
               + " LIMIT 1", expecteds2);
       Assert.fail();
@@ -283,10 +184,10 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
 
     // Apply date transform function on json path expression and check for IS NOT NULL
     Object[][] expecteds3 = {{9590400000L}};
-    checkresult("SELECT FROMEPOCHDAYS(jsonColumn.days) FROM testTable WHERE jsonColumn.days IS NOT NULL LIMIT 1",
+    checkResult("SELECT FROMEPOCHDAYS(jsonColumn.days) FROM testTable WHERE jsonColumn.days IS NOT NULL LIMIT 1",
         expecteds3);
     try {
-      checkresult(
+      checkResult(
           "SELECT FROMEPOCHDAYS(jsonColumnWithoutIndex.days) FROM testTable WHERE jsonColumnWithoutIndex.days IS NOT "
               + "NULL LIMIT 1", expecteds3);
       Assert.fail();
@@ -302,18 +203,18 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
 
     // Test without user-specified alias.
     Object[][] expecteds1 = {{181.0}};
-    checkresult("SELECT MAX(jsonColumn.id) FROM testTable", expecteds1);
-    checkresult("SELECT MAX(jsonColumnWithoutIndex.id) FROM testTable", expecteds1);
+    checkResult("SELECT MAX(jsonColumn.id) FROM testTable", expecteds1);
+    checkResult("SELECT MAX(jsonColumnWithoutIndex.id) FROM testTable", expecteds1);
 
     // Test with user-specified alias.
     Object[][] expecteds2 = {{181.0}};
-    checkresult("SELECT MAX(jsonColumn.id) AS x FROM testTable", expecteds2);
-    checkresult("SELECT MAX(jsonColumnWithoutIndex.id) AS x FROM testTable", expecteds2);
+    checkResult("SELECT MAX(jsonColumn.id) AS x FROM testTable", expecteds2);
+    checkResult("SELECT MAX(jsonColumnWithoutIndex.id) AS x FROM testTable", expecteds2);
 
     // Test with nested function calls (minus function being used within max function).
     Object[][] expecteds3 = {{176.0}};
-    checkresult("SELECT MAX(jsonColumn.id - 5) FROM testTable", expecteds3);
-    checkresult("SELECT MAX(jsonColumnWithoutIndex.id - 5) FROM testTable", expecteds3);
+    checkResult("SELECT MAX(jsonColumn.id - 5) FROM testTable", expecteds3);
+    checkResult("SELECT MAX(jsonColumnWithoutIndex.id - 5) FROM testTable", expecteds3);
   }
 
   @Test
@@ -321,17 +222,17 @@ public class JsonPathQueriesTest extends BaseQueriesTest {
     // SELECT using json path expressions that refers to second element of a top-level array.
     Object[][] expecteds1 = {{"{\"i1\":3,\"i2\":4}"}, {"{\"i1\":3,\"i2\":4}"}, {"{\"i1\":3,\"i2\":4}"}, {"{\"i1\":3,"
         + "\"i2\":4}"}};
-    checkresult("SELECT jsonColumn[1] FROM testTable WHERE intColumn=14", expecteds1);
+    checkResult("SELECT jsonColumn[1] FROM testTable WHERE intColumn=14", expecteds1);
 
     // SELECT using json path expressions that refers to item within second element of a top-level array.
     Object[][] expecteds2 = {{"4"}, {"4"}, {"4"}, {"4"}};
-    checkresult("SELECT jsonColumn[1].i2 FROM testTable WHERE intColumn=14", expecteds2);
+    checkResult("SELECT jsonColumn[1].i2 FROM testTable WHERE intColumn=14", expecteds2);
 
     // SELECT using json path expression and check path expression for IS NULL.
-    checkresult("SELECT jsonColumn[1].i2 FROM testTable WHERE jsonColumn[1].i2 IS NOT NULL", expecteds2);
+    checkResult("SELECT jsonColumn[1].i2 FROM testTable WHERE jsonColumn[1].i2 IS NOT NULL", expecteds2);
 
     // GROUP BY using a json path expression that refers to a top-level array element.
     Object[][] expecteds3 = {{"{\"i1\":3,\"i2\":4}", 4L}, {"null", 56L}};
-    checkresult("SELECT jsonColumn[1], count(*) FROM testTable GROUP BY jsonColumn[1]", expecteds3);
+    checkResult("SELECT jsonColumn[1], count(*) FROM testTable GROUP BY jsonColumn[1]", expecteds3);
   }
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/TransformEvaluator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/TransformEvaluator.java
new file mode 100644
index 0000000..64badd3
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/TransformEvaluator.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.evaluator;
+
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+
+
+/**
+ * This is an evolving SPI and subject to change.
+ */
+public interface TransformEvaluator {
+  /**
+   * Evaluate the JSON path and fill the value buffer
+   * @param docIds the doc ids to evaluate the JSON path for
+   * @param length the number of doc ids to evaluate for
+   * @param reader the ForwardIndexReader
+   * @param context the reader context
+   * @param dictionary the dictionary
+   * @param valueBuffer the values to fill
+   * @param <T> type of the reader context
+   */
+  <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length, ForwardIndexReader<T> reader,
+      T context, Dictionary dictionary, int[] dictIdBuffer, int[] valueBuffer);
+
+  /**
+   * Evaluate the JSON path and fill the value buffer
+   * @param docIds the doc ids to evaluate the JSON path for
+   * @param length the number of doc ids to evaluate for
+   * @param reader the ForwardIndexReader
+   * @param context the reader context
+   * @param valueBuffer the values to fill
+   * @param <T> type of the reader context
+   */
+  <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length, ForwardIndexReader<T> reader,
+      T context, Dictionary dictionary, int[] dictIdBuffer, long[] valueBuffer);
+
+  /**
+   * Evaluate the JSON path and fill the value buffer
+   * @param docIds the doc ids to evaluate the JSON path for
+   * @param length the number of doc ids to evaluate for
+   * @param reader the ForwardIndexReader
+   * @param context the reader context
+   * @param valueBuffer the values to fill
+   * @param <T> type of the reader context
+   */
+  <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length, ForwardIndexReader<T> reader,
+      T context, Dictionary dictionary, int[] dictIdBuffer, float[] valueBuffer);
+
+  /**
+   * Evaluate the JSON path and fill the value buffer
+   * @param docIds the doc ids to evaluate the JSON path for
+   * @param length the number of doc ids to evaluate for
+   * @param reader the ForwardIndexReader
+   * @param context the reader context
+   * @param valueBuffer the values to fill
+   * @param <T> type of the reader context
+   */
+  <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length, ForwardIndexReader<T> reader,
+      T context, Dictionary dictionary, int[] dictIdBuffer, double[] valueBuffer);
+
+  /**
+   * Evaluate the JSON path and fill the value buffer
+   * @param docIds the doc ids to evaluate the JSON path for
+   * @param length the number of doc ids to evaluate for
+   * @param reader the ForwardIndexReader
+   * @param context the reader context
+   * @param valueBuffer the values to fill
+   * @param <T> type of the reader context
+   */
+  <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length, ForwardIndexReader<T> reader,
+      T context, Dictionary dictionary, int[] dictIdBuffer, String[] valueBuffer);
+
+  /**
+   * Evaluate the JSON path and fill the value buffer
+   * @param docIds the doc ids to evaluate the JSON path for
+   * @param length the number of doc ids to evaluate for
+   * @param reader the ForwardIndexReader
+   * @param context the reader context
+   * @param dictIdsBuffer a buffer for dictionary ids if required
+   * @param valueBuffer the values to fill
+   * @param <T> type of the reader context
+   */
+  <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, int[][] valueBuffer);
+
+  /**
+   * Evaluate the JSON path and fill the value buffer
+   * @param docIds the doc ids to evaluate the JSON path for
+   * @param length the number of doc ids to evaluate for
+   * @param reader the ForwardIndexReader
+   * @param context the reader context
+   * @param dictIdsBuffer a buffer for dictionary ids if required
+   * @param valueBuffer the values to fill
+   * @param <T> type of the reader context
+   */
+  <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, long[][] valueBuffer);
+
+  /**
+   * Evaluate the JSON path and fill the value buffer
+   * @param docIds the doc ids to evaluate the JSON path for
+   * @param length the number of doc ids to evaluate for
+   * @param reader the ForwardIndexReader
+   * @param context the reader context
+   * @param dictIdsBuffer a buffer for dictionary ids if required
+   * @param valueBuffer the values to fill
+   * @param <T> type of the reader context
+   */
+  <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, float[][] valueBuffer);
+
+  /**
+   * Evaluate the JSON path and fill the value buffer
+   * @param docIds the doc ids to evaluate the JSON path for
+   * @param length the number of doc ids to evaluate for
+   * @param reader the ForwardIndexReader
+   * @param context the reader context
+   * @param dictIdsBuffer a buffer for dictionary ids if required
+   * @param valueBuffer the values to fill
+   * @param <T> type of the reader context
+   */
+  <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, double[][] valueBuffer);
+
+  /**
+   * Evaluate the JSON path and fill the value buffer
+   * @param docIds the doc ids to evaluate the JSON path for
+   * @param length the number of doc ids to evaluate for
+   * @param reader the ForwardIndexReader
+   * @param context the reader context
+   * @param dictIdsBuffer a buffer for dictionary ids if required
+   * @param valueBuffer the values to fill
+   * @param <T> type of the reader context
+   */
+  <T extends ForwardIndexReaderContext> void evaluateBlock(int[] docIds, int length,
+      ForwardIndexReader<T> reader, T context, Dictionary dictionary, int[] dictIdsBuffer, String[][] valueBuffer);
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluator.java
new file mode 100644
index 0000000..034151e
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluator.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.evaluator.json;
+
+import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
+
+/**
+ * Introduce an empty interface to allow it to be extended without
+ * affecting {@see TransformEvaluator}.
+ *
+ * This is an evolving SPI and subject to change.
+ */
+public interface JsonPathEvaluator extends TransformEvaluator {
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluatorProvider.java
new file mode 100644
index 0000000..4de6025
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluatorProvider.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.evaluator.json;
+
+/**
+ * This is an evolving SPI and subject to change.
+ */
+public interface JsonPathEvaluatorProvider {
+  /**
+   * Create a {@see JsonPathEvaluator}
+   * @param delegate to be delegated to for evaluation
+   * @param jsonPath the json path as a string
+   * @param defaultValue the default value
+   * @return a {@see JsonPathEvaluator}
+   */
+  JsonPathEvaluator create(JsonPathEvaluator delegate, String jsonPath, Object defaultValue);
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluators.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluators.java
new file mode 100644
index 0000000..0106d3b
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/evaluator/json/JsonPathEvaluators.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.evaluator.json;
+
+import com.google.common.base.Preconditions;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Allows registration of a custom {@see JsonPathEvaluator} which can handle custom storage
+ * functionality also present in a plugin. A default evaluator which can handle all default
+ * storage types will be provided to delegate to when standard storage types are encountered.
+ *
+ * This is an evolving SPI and subject to change.
+ */
+public final class JsonPathEvaluators {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(JsonPathEvaluators.class);
+
+  private static final AtomicReferenceFieldUpdater<JsonPathEvaluators, JsonPathEvaluatorProvider> UPDATER =
+      AtomicReferenceFieldUpdater.newUpdater(JsonPathEvaluators.class, JsonPathEvaluatorProvider.class, "_provider");
+  private static final JsonPathEvaluators INSTANCE = new JsonPathEvaluators();
+  private static final DefaultProvider DEFAULT_PROVIDER = new DefaultProvider();
+  private volatile JsonPathEvaluatorProvider _provider;
+
+  /**
+   * Registration point to override how JSON paths are evaluated. This should be used
+   * when a Pinot plugin has special storage capabilities. For instance, imagine a
+   * plugin with a raw forward index which stores JSON in a binary format which
+   * pinot-core is unaware of and cannot evaluate JSON paths against (pinot-core only
+   * understands true JSON documents). Whenever JSON paths are evaluated against this
+   * custom storage, different storage access operations may be required, and the provided
+   * {@see JsonPathEvaluator} can inspect the provided {@see ForwardIndexReader} to
+   * determine whether it is the custom implementation and evaluate the JSON path against
+   * the binary JSON managed by the custom reader. If it is not the custom implementation,
+   * then the evaluation should be delegated to the provided delegate.
+   *
+   * This prevents the interface {@see ForwardIndexReader} from needing to be able to model
+   * any plugin storage format, which creates flexibility for the kinds of data structure
+   * plugins can employ.
+   *
+   * @param provider provides {@see JsonPathEvaluator}
+   * @return true if registration is successful, false otherwise
+   */
+  public static boolean registerProvider(JsonPathEvaluatorProvider provider) {
+    Preconditions.checkArgument(provider != null, "");
+    if (!UPDATER.compareAndSet(INSTANCE, null, provider)) {
+      LOGGER.warn("failed to register {} - {} already registered", provider, INSTANCE._provider);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * pinot-core must construct {@see JsonPathEvaluator} via this method to ensure it uses
+   * the registered implementation. Using the registered implementation allows pinot-core
+   * to evaluate JSON paths against data structures it doesn't understand or model.
+   * @param jsonPath the JSON path
+   * @param defaultValue the default value
+   * @return a JSON path evaluator which must understand all possible storage representations of JSON.
+   */
+  public static JsonPathEvaluator create(String jsonPath, Object defaultValue) {
+    // plugins compose and delegate to the default implementation.
+    JsonPathEvaluator defaultEvaluator = DEFAULT_PROVIDER.create(jsonPath, defaultValue);
+    return Holder.PROVIDER.create(defaultEvaluator, jsonPath, defaultValue);
+  }
+
+  /**
+   * Storing the registered evaluator in this holder and initialising it during
+   * the class load gives the best of both worlds: plugins have until the first
+   * JSON path evaluation to register an evaluator via
+   * {@see JsonPathEvaluators#registerProvider}, but once this class is loaded,
+   * the provider is constant and calls may be optimise aggressively by the JVM
+   * in ways which are impossible with a volatile reference.
+   */
+  private static final class Holder {
+    static final JsonPathEvaluatorProvider PROVIDER;
+
+    static {
+      JsonPathEvaluatorProvider provider = JsonPathEvaluators.INSTANCE._provider;
+      if (provider == null) {
+        provider = DEFAULT_PROVIDER;
+        if (!UPDATER.compareAndSet(INSTANCE, null, provider)) {
+          provider = JsonPathEvaluators.INSTANCE._provider;
+        }
+      }
+      PROVIDER = provider;
+    }
+  }
+
+  private static class DefaultProvider implements JsonPathEvaluatorProvider {
+
+    // default implementation uses MethodHandles to avoid pulling lots of implementation details into the SPI layer
+
+    private static final MethodHandle FACTORY;
+
+    static {
+      String className = "org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator";
+      MethodHandle factory = null;
+      try {
+        Class<?> clazz = Class.forName(className, false, JsonPathEvaluators.class.getClassLoader());
+        factory = MethodHandles.publicLookup()
+            .findStatic(clazz, "create", MethodType.methodType(JsonPathEvaluator.class, String.class, Object.class));
+      } catch (Throwable implausible) {
+        LOGGER.error("could not construct MethodHandle for {}", className,
+            implausible);
+      }
+      FACTORY = factory;
+    }
+
+    public JsonPathEvaluator create(String jsonPath, Object defaultValue) {
+      return create(null, jsonPath, defaultValue);
+    }
+
+    @Override
+    public JsonPathEvaluator create(JsonPathEvaluator delegate, String jsonPath, Object defaultValue) {
+      try {
+        return (JsonPathEvaluator) FACTORY.invokeExact(jsonPath, defaultValue);
+      } catch (IllegalArgumentException e) {
+        throw e;
+      } catch (Throwable t) {
+        throw new RuntimeException(t);
+      }
+    }
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org