You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:32:28 UTC

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #29067: [SPARK-32274][SQL] Make SQL cache serialization pluggable

dongjoon-hyun commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r461865047



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, Length, LessThan, LessThanOrEqual, Literal, Or, Predicate, StartsWith}
+import org.apache.spark.sql.execution.columnar.{ColumnStatisticsSchema, PartitionStatistics}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
+
+/**
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Can `convertForCacheColumnar()` be called instead of `convertForCache()` for this given
+   * schema? True if it can and false if it cannot. Columnar input is only supported if the
+   * plan could produce columnar output. Currently this is mostly supported by input formats
+   * like parquet and orc, but more operations are likely to be supported soon.
+   *
+   * @param schema the schema of the data being stored.
+   * @return True if columnar input can be supported, else false.
+   */
+  def supportsColumnarInput(schema: Seq[Attribute]): Boolean
+
+  /**
+   * Convert an `RDD[InternalRow]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCache(
+      input: RDD[InternalRow],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Convert an `RDD[ColumnarBatch]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * This will only be called if `supportsColumnarInput()` returned true for the given schema and
+   * the plan up to this point would could produce columnar output without modifying it.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCacheColumnar(
+      input: RDD[ColumnarBatch],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter batches prior to being decompressed.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide the filter logic
+   * necessary. You will need to provide metrics for this to work. [[SimpleMetricsCachedBatch]]
+   * provides the APIs to hold those metrics and explains the metrics used, really just min and max.
+   * Note that this is intended to skip batches that are not needed, and the actual filtering of
+   * individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be decompressed.
+   */
+  def buildFilter(
+      predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Can `decompressColumnar()` be called instead of `decompressToRows()` for this given
+   * schema? True if it can and false if it cannot. Columnar output is typically preferred
+   * because it is more efficient. Note that `decompressToRows()` must always be supported
+   * as there are other checks that can force row based output.
+   * @param schema the schema of the data being checked.
+   * @return true if columnar output should be used for this schema, else false.
+   */
+  def supportsColumnarOutput(schema: StructType): Boolean
+
+  /**
+   * The exact java types of the columns that are output in columnar processing mode. This
+   * is a performance optimization for code generation and is optional.
+   * @param attributes the attributes to be output.
+   * @param conf the config for the query that will read the data.
+   */
+  def vectorTypes(attributes: Seq[Attribute], conf: SQLConf): Option[Seq[String]] = None
+
+  /**
+   * Convert the cached data into a ColumnarBatch. This currently is only used if
+   * `supportsColumnar()` returned true for the associated schema, but there are other checks

Review comment:
       `returned` -> `returns`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, Length, LessThan, LessThanOrEqual, Literal, Or, Predicate, StartsWith}
+import org.apache.spark.sql.execution.columnar.{ColumnStatisticsSchema, PartitionStatistics}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
+
+/**
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Can `convertForCacheColumnar()` be called instead of `convertForCache()` for this given
+   * schema? True if it can and false if it cannot. Columnar input is only supported if the
+   * plan could produce columnar output. Currently this is mostly supported by input formats
+   * like parquet and orc, but more operations are likely to be supported soon.
+   *
+   * @param schema the schema of the data being stored.
+   * @return True if columnar input can be supported, else false.
+   */
+  def supportsColumnarInput(schema: Seq[Attribute]): Boolean
+
+  /**
+   * Convert an `RDD[InternalRow]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCache(
+      input: RDD[InternalRow],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Convert an `RDD[ColumnarBatch]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * This will only be called if `supportsColumnarInput()` returned true for the given schema and
+   * the plan up to this point would could produce columnar output without modifying it.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCacheColumnar(
+      input: RDD[ColumnarBatch],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter batches prior to being decompressed.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide the filter logic
+   * necessary. You will need to provide metrics for this to work. [[SimpleMetricsCachedBatch]]
+   * provides the APIs to hold those metrics and explains the metrics used, really just min and max.
+   * Note that this is intended to skip batches that are not needed, and the actual filtering of
+   * individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be decompressed.
+   */
+  def buildFilter(
+      predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Can `decompressColumnar()` be called instead of `decompressToRows()` for this given
+   * schema? True if it can and false if it cannot. Columnar output is typically preferred
+   * because it is more efficient. Note that `decompressToRows()` must always be supported
+   * as there are other checks that can force row based output.
+   * @param schema the schema of the data being checked.
+   * @return true if columnar output should be used for this schema, else false.
+   */
+  def supportsColumnarOutput(schema: StructType): Boolean
+
+  /**
+   * The exact java types of the columns that are output in columnar processing mode. This
+   * is a performance optimization for code generation and is optional.
+   * @param attributes the attributes to be output.
+   * @param conf the config for the query that will read the data.
+   */
+  def vectorTypes(attributes: Seq[Attribute], conf: SQLConf): Option[Seq[String]] = None
+
+  /**
+   * Convert the cached data into a ColumnarBatch. This currently is only used if
+   * `supportsColumnar()` returned true for the associated schema, but there are other checks
+   * that can force row based output. One of the main advantages of doing columnar output over row
+   * based output is that the code generation is more standard and can be combined with code
+   * generation for downstream operations.
+   * @param input the cached batches that should be converted.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the fields that should be loaded from the data and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return an RDD of the input cached batches transformed into the ColumnarBatch format.
+   */
+  def convertFromCacheColumnar(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch]
+
+  /**
+   * Convert the cached batch into `InternalRow`s. If you want this to be performant, code
+   * generation is advised.
+   * @param input the cached batches that should be converted.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data and the order they
+   *                           should appear in the output rows.
+   * @param conf the configuration for the job.
+   * @return RDD of the rows that were stored in the cached batches.
+   */
+  def convertFromCache(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[InternalRow]
+}
+
+/**
+ * A [[CachedBatch]] that stores some simple metrics that can be used for filtering of batches with
+ * the [[SimpleMetricsCachedBatchSerializer]].
+ * The metrics are returned by the stats value. For each column in the batch 5 columns of metadata
+ * are needed in the row.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait SimpleMetricsCachedBatch extends CachedBatch {
+  /**
+   * Holds the same as ColumnStats.

Review comment:
       Sorry, but this is technically a little misleading because `ColumnStats` trait doesn't have `upperBound` and `lowerBound`. Since the upperBound and lowerBound needs to hold a value, those are designed to be declared at the derived classes like `BooleanColumnStats`.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, Length, LessThan, LessThanOrEqual, Literal, Or, Predicate, StartsWith}
+import org.apache.spark.sql.execution.columnar.{ColumnStatisticsSchema, PartitionStatistics}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
+
+/**
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Can `convertForCacheColumnar()` be called instead of `convertForCache()` for this given
+   * schema? True if it can and false if it cannot. Columnar input is only supported if the
+   * plan could produce columnar output. Currently this is mostly supported by input formats
+   * like parquet and orc, but more operations are likely to be supported soon.
+   *
+   * @param schema the schema of the data being stored.
+   * @return True if columnar input can be supported, else false.
+   */
+  def supportsColumnarInput(schema: Seq[Attribute]): Boolean
+
+  /**
+   * Convert an `RDD[InternalRow]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCache(
+      input: RDD[InternalRow],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Convert an `RDD[ColumnarBatch]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * This will only be called if `supportsColumnarInput()` returned true for the given schema and
+   * the plan up to this point would could produce columnar output without modifying it.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCacheColumnar(
+      input: RDD[ColumnarBatch],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter batches prior to being decompressed.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide the filter logic
+   * necessary. You will need to provide metrics for this to work. [[SimpleMetricsCachedBatch]]
+   * provides the APIs to hold those metrics and explains the metrics used, really just min and max.
+   * Note that this is intended to skip batches that are not needed, and the actual filtering of
+   * individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be decompressed.
+   */
+  def buildFilter(
+      predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Can `decompressColumnar()` be called instead of `decompressToRows()` for this given
+   * schema? True if it can and false if it cannot. Columnar output is typically preferred
+   * because it is more efficient. Note that `decompressToRows()` must always be supported
+   * as there are other checks that can force row based output.
+   * @param schema the schema of the data being checked.
+   * @return true if columnar output should be used for this schema, else false.
+   */
+  def supportsColumnarOutput(schema: StructType): Boolean
+
+  /**
+   * The exact java types of the columns that are output in columnar processing mode. This
+   * is a performance optimization for code generation and is optional.
+   * @param attributes the attributes to be output.
+   * @param conf the config for the query that will read the data.
+   */
+  def vectorTypes(attributes: Seq[Attribute], conf: SQLConf): Option[Seq[String]] = None
+
+  /**
+   * Convert the cached data into a ColumnarBatch. This currently is only used if
+   * `supportsColumnar()` returned true for the associated schema, but there are other checks
+   * that can force row based output. One of the main advantages of doing columnar output over row
+   * based output is that the code generation is more standard and can be combined with code
+   * generation for downstream operations.
+   * @param input the cached batches that should be converted.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the fields that should be loaded from the data and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return an RDD of the input cached batches transformed into the ColumnarBatch format.
+   */
+  def convertFromCacheColumnar(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch]
+
+  /**
+   * Convert the cached batch into `InternalRow`s. If you want this to be performant, code
+   * generation is advised.
+   * @param input the cached batches that should be converted.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data and the order they
+   *                           should appear in the output rows.
+   * @param conf the configuration for the job.
+   * @return RDD of the rows that were stored in the cached batches.
+   */
+  def convertFromCache(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[InternalRow]
+}
+
+/**
+ * A [[CachedBatch]] that stores some simple metrics that can be used for filtering of batches with
+ * the [[SimpleMetricsCachedBatchSerializer]].
+ * The metrics are returned by the stats value. For each column in the batch 5 columns of metadata
+ * are needed in the row.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait SimpleMetricsCachedBatch extends CachedBatch {
+  /**
+   * Holds the same as ColumnStats.

Review comment:
       Sorry, but this is technically a little misleading because `ColumnStats` trait itself doesn't have `upperBound` and `lowerBound`. Since the upperBound and lowerBound needs to hold a value, those are designed to be declared at the derived classes like `BooleanColumnStats`.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, Length, LessThan, LessThanOrEqual, Literal, Or, Predicate, StartsWith}
+import org.apache.spark.sql.execution.columnar.{ColumnStatisticsSchema, PartitionStatistics}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
+
+/**
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Can `convertForCacheColumnar()` be called instead of `convertForCache()` for this given
+   * schema? True if it can and false if it cannot. Columnar input is only supported if the
+   * plan could produce columnar output. Currently this is mostly supported by input formats
+   * like parquet and orc, but more operations are likely to be supported soon.
+   *
+   * @param schema the schema of the data being stored.
+   * @return True if columnar input can be supported, else false.
+   */
+  def supportsColumnarInput(schema: Seq[Attribute]): Boolean
+
+  /**
+   * Convert an `RDD[InternalRow]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCache(
+      input: RDD[InternalRow],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Convert an `RDD[ColumnarBatch]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * This will only be called if `supportsColumnarInput()` returned true for the given schema and
+   * the plan up to this point would could produce columnar output without modifying it.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCacheColumnar(
+      input: RDD[ColumnarBatch],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter batches prior to being decompressed.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide the filter logic
+   * necessary. You will need to provide metrics for this to work. [[SimpleMetricsCachedBatch]]
+   * provides the APIs to hold those metrics and explains the metrics used, really just min and max.
+   * Note that this is intended to skip batches that are not needed, and the actual filtering of
+   * individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be decompressed.
+   */
+  def buildFilter(
+      predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Can `decompressColumnar()` be called instead of `decompressToRows()` for this given
+   * schema? True if it can and false if it cannot. Columnar output is typically preferred
+   * because it is more efficient. Note that `decompressToRows()` must always be supported
+   * as there are other checks that can force row based output.
+   * @param schema the schema of the data being checked.
+   * @return true if columnar output should be used for this schema, else false.
+   */
+  def supportsColumnarOutput(schema: StructType): Boolean
+
+  /**
+   * The exact java types of the columns that are output in columnar processing mode. This
+   * is a performance optimization for code generation and is optional.
+   * @param attributes the attributes to be output.
+   * @param conf the config for the query that will read the data.
+   */
+  def vectorTypes(attributes: Seq[Attribute], conf: SQLConf): Option[Seq[String]] = None
+
+  /**
+   * Convert the cached data into a ColumnarBatch. This currently is only used if
+   * `supportsColumnar()` returned true for the associated schema, but there are other checks
+   * that can force row based output. One of the main advantages of doing columnar output over row
+   * based output is that the code generation is more standard and can be combined with code
+   * generation for downstream operations.
+   * @param input the cached batches that should be converted.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the fields that should be loaded from the data and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return an RDD of the input cached batches transformed into the ColumnarBatch format.
+   */
+  def convertFromCacheColumnar(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch]
+
+  /**
+   * Convert the cached batch into `InternalRow`s. If you want this to be performant, code
+   * generation is advised.
+   * @param input the cached batches that should be converted.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data and the order they
+   *                           should appear in the output rows.
+   * @param conf the configuration for the job.
+   * @return RDD of the rows that were stored in the cached batches.
+   */
+  def convertFromCache(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[InternalRow]
+}
+
+/**
+ * A [[CachedBatch]] that stores some simple metrics that can be used for filtering of batches with
+ * the [[SimpleMetricsCachedBatchSerializer]].
+ * The metrics are returned by the stats value. For each column in the batch 5 columns of metadata
+ * are needed in the row.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait SimpleMetricsCachedBatch extends CachedBatch {
+  /**
+   * Holds the same as ColumnStats.

Review comment:
       Sorry, but this is technically a little misleading because `ColumnStats` trait itself doesn't have `upperBound` and `lowerBound`. Since `upperBound` and `lowerBound` needs to hold a concrete value, those are designed to be declared at the derived classes like `BooleanColumnStats`.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, Length, LessThan, LessThanOrEqual, Literal, Or, Predicate, StartsWith}
+import org.apache.spark.sql.execution.columnar.{ColumnStatisticsSchema, PartitionStatistics}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
+
+/**
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Can `convertForCacheColumnar()` be called instead of `convertForCache()` for this given
+   * schema? True if it can and false if it cannot. Columnar input is only supported if the
+   * plan could produce columnar output. Currently this is mostly supported by input formats
+   * like parquet and orc, but more operations are likely to be supported soon.
+   *
+   * @param schema the schema of the data being stored.
+   * @return True if columnar input can be supported, else false.
+   */
+  def supportsColumnarInput(schema: Seq[Attribute]): Boolean
+
+  /**
+   * Convert an `RDD[InternalRow]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCache(
+      input: RDD[InternalRow],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Convert an `RDD[ColumnarBatch]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * This will only be called if `supportsColumnarInput()` returned true for the given schema and
+   * the plan up to this point would could produce columnar output without modifying it.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCacheColumnar(
+      input: RDD[ColumnarBatch],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter batches prior to being decompressed.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide the filter logic
+   * necessary. You will need to provide metrics for this to work. [[SimpleMetricsCachedBatch]]
+   * provides the APIs to hold those metrics and explains the metrics used, really just min and max.
+   * Note that this is intended to skip batches that are not needed, and the actual filtering of
+   * individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be decompressed.
+   */
+  def buildFilter(
+      predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Can `decompressColumnar()` be called instead of `decompressToRows()` for this given
+   * schema? True if it can and false if it cannot. Columnar output is typically preferred
+   * because it is more efficient. Note that `decompressToRows()` must always be supported
+   * as there are other checks that can force row based output.
+   * @param schema the schema of the data being checked.
+   * @return true if columnar output should be used for this schema, else false.
+   */
+  def supportsColumnarOutput(schema: StructType): Boolean
+
+  /**
+   * The exact java types of the columns that are output in columnar processing mode. This
+   * is a performance optimization for code generation and is optional.
+   * @param attributes the attributes to be output.
+   * @param conf the config for the query that will read the data.
+   */
+  def vectorTypes(attributes: Seq[Attribute], conf: SQLConf): Option[Seq[String]] = None
+
+  /**
+   * Convert the cached data into a ColumnarBatch. This currently is only used if
+   * `supportsColumnar()` returned true for the associated schema, but there are other checks
+   * that can force row based output. One of the main advantages of doing columnar output over row
+   * based output is that the code generation is more standard and can be combined with code
+   * generation for downstream operations.
+   * @param input the cached batches that should be converted.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the fields that should be loaded from the data and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return an RDD of the input cached batches transformed into the ColumnarBatch format.
+   */
+  def convertFromCacheColumnar(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch]
+
+  /**
+   * Convert the cached batch into `InternalRow`s. If you want this to be performant, code
+   * generation is advised.
+   * @param input the cached batches that should be converted.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data and the order they
+   *                           should appear in the output rows.
+   * @param conf the configuration for the job.
+   * @return RDD of the rows that were stored in the cached batches.
+   */
+  def convertFromCache(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[InternalRow]
+}
+
+/**
+ * A [[CachedBatch]] that stores some simple metrics that can be used for filtering of batches with
+ * the [[SimpleMetricsCachedBatchSerializer]].
+ * The metrics are returned by the stats value. For each column in the batch 5 columns of metadata
+ * are needed in the row.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait SimpleMetricsCachedBatch extends CachedBatch {
+  /**
+   * Holds the same as ColumnStats.
+   * upperBound (optional), lowerBound (Optional), nullCount: Int, rowCount: Int, sizeInBytes: Long
+   * Which is repeated for each column in the original data.
+   */
+  val stats: InternalRow
+  override def sizeInBytes: Long =
+    Range.apply(4, stats.numFields, 5).map(stats.getLong).sum
+}
+
+// Currently, only use statistics from atomic types except binary type only.

Review comment:
       nit. Maybe, `binary type only` -> `binary type`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, Length, LessThan, LessThanOrEqual, Literal, Or, Predicate, StartsWith}
+import org.apache.spark.sql.execution.columnar.{ColumnStatisticsSchema, PartitionStatistics}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
+
+/**
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Can `convertForCacheColumnar()` be called instead of `convertForCache()` for this given
+   * schema? True if it can and false if it cannot. Columnar input is only supported if the
+   * plan could produce columnar output. Currently this is mostly supported by input formats
+   * like parquet and orc, but more operations are likely to be supported soon.
+   *
+   * @param schema the schema of the data being stored.
+   * @return True if columnar input can be supported, else false.
+   */
+  def supportsColumnarInput(schema: Seq[Attribute]): Boolean
+
+  /**
+   * Convert an `RDD[InternalRow]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCache(
+      input: RDD[InternalRow],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Convert an `RDD[ColumnarBatch]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * This will only be called if `supportsColumnarInput()` returned true for the given schema and
+   * the plan up to this point would could produce columnar output without modifying it.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCacheColumnar(
+      input: RDD[ColumnarBatch],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter batches prior to being decompressed.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide the filter logic
+   * necessary. You will need to provide metrics for this to work. [[SimpleMetricsCachedBatch]]
+   * provides the APIs to hold those metrics and explains the metrics used, really just min and max.
+   * Note that this is intended to skip batches that are not needed, and the actual filtering of
+   * individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be decompressed.
+   */
+  def buildFilter(
+      predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Can `decompressColumnar()` be called instead of `decompressToRows()` for this given
+   * schema? True if it can and false if it cannot. Columnar output is typically preferred
+   * because it is more efficient. Note that `decompressToRows()` must always be supported
+   * as there are other checks that can force row based output.
+   * @param schema the schema of the data being checked.
+   * @return true if columnar output should be used for this schema, else false.
+   */
+  def supportsColumnarOutput(schema: StructType): Boolean
+
+  /**
+   * The exact java types of the columns that are output in columnar processing mode. This
+   * is a performance optimization for code generation and is optional.
+   * @param attributes the attributes to be output.
+   * @param conf the config for the query that will read the data.
+   */
+  def vectorTypes(attributes: Seq[Attribute], conf: SQLConf): Option[Seq[String]] = None
+
+  /**
+   * Convert the cached data into a ColumnarBatch. This currently is only used if
+   * `supportsColumnar()` returned true for the associated schema, but there are other checks
+   * that can force row based output. One of the main advantages of doing columnar output over row
+   * based output is that the code generation is more standard and can be combined with code
+   * generation for downstream operations.
+   * @param input the cached batches that should be converted.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the fields that should be loaded from the data and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return an RDD of the input cached batches transformed into the ColumnarBatch format.
+   */
+  def convertFromCacheColumnar(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch]
+
+  /**
+   * Convert the cached batch into `InternalRow`s. If you want this to be performant, code
+   * generation is advised.
+   * @param input the cached batches that should be converted.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data and the order they
+   *                           should appear in the output rows.
+   * @param conf the configuration for the job.
+   * @return RDD of the rows that were stored in the cached batches.
+   */
+  def convertFromCache(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[InternalRow]
+}
+
+/**
+ * A [[CachedBatch]] that stores some simple metrics that can be used for filtering of batches with
+ * the [[SimpleMetricsCachedBatchSerializer]].
+ * The metrics are returned by the stats value. For each column in the batch 5 columns of metadata
+ * are needed in the row.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait SimpleMetricsCachedBatch extends CachedBatch {
+  /**
+   * Holds the same as ColumnStats.

Review comment:
       Sorry, but this may be technically a little misleading because `ColumnStats` trait itself doesn't have `upperBound` and `lowerBound`. Since `upperBound` and `lowerBound` needs to hold a concrete value, those are designed to be declared at the derived classes like `BooleanColumnStats`. However, all instances having `ColumnStats` will hold them. Yes.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,33 +19,186 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer, SimpleMetricsCachedBatch, SimpleMetricsCachedBatchSerializer}
+import org.apache.spark.sql.execution.{ColumnarToRowTransition, InputAdapter, QueryExecution, SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
-
+import org.apache.spark.util.{LongAccumulator, Utils}
 
 /**
- * CachedBatch is a cached batch of rows.
+ * The default implementation of CachedBatch.
  *
  * @param numRows The total number of rows in this batch
  * @param buffers The buffers for serialized columns
  * @param stats The stat of columns
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+case class DefaultCachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+    extends SimpleMetricsCachedBatch

Review comment:
       nit. Indentation.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,33 +19,186 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer, SimpleMetricsCachedBatch, SimpleMetricsCachedBatchSerializer}
+import org.apache.spark.sql.execution.{ColumnarToRowTransition, InputAdapter, QueryExecution, SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
-
+import org.apache.spark.util.{LongAccumulator, Utils}
 
 /**
- * CachedBatch is a cached batch of rows.
+ * The default implementation of CachedBatch.
  *
  * @param numRows The total number of rows in this batch
  * @param buffers The buffers for serialized columns
  * @param stats The stat of columns
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+case class DefaultCachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+    extends SimpleMetricsCachedBatch
+
+/**
+ * The default implementation of CachedBatchSerializer.
+ */
+class DefaultCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer {
+  override def supportsColumnarInput(schema: Seq[Attribute]): Boolean = false
+
+  override def convertForCacheColumnar(
+      input: RDD[ColumnarBatch],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch] =
+    throw new IllegalStateException("Columnar input is not supported")
+
+  override def convertForCache(input: RDD[InternalRow],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch] = {
+    val batchSize = conf.columnBatchSize
+    val useCompression = conf.useCompression
+    convertForCacheInternal(input, schema, batchSize, useCompression)
+  }
+
+  def convertForCacheInternal(input: RDD[InternalRow],
+      output: Seq[Attribute],
+      batchSize: Int,
+      useCompression: Boolean): RDD[CachedBatch] = {
+    input.mapPartitionsInternal { rowIterator =>
+      new Iterator[DefaultCachedBatch] {
+        def next(): DefaultCachedBatch = {
+          val columnBuilders = output.map { attribute =>
+            ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
+          }.toArray
+
+          var rowCount = 0
+          var totalSize = 0L
+          while (rowIterator.hasNext && rowCount < batchSize
+              && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
+            val row = rowIterator.next()
+
+            // Added for SPARK-6082. This assertion can be useful for scenarios when something
+            // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM
+            // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat
+            // hard to decipher.
+            assert(
+              row.numFields == columnBuilders.length,
+              s"Row column number mismatch, expected ${output.size} columns, " +
+                  s"but got ${row.numFields}." +
+                  s"\nRow content: $row")
+
+            var i = 0
+            totalSize = 0
+            while (i < row.numFields) {
+              columnBuilders(i).appendFrom(row, i)
+              totalSize += columnBuilders(i).columnStats.sizeInBytes
+              i += 1
+            }
+            rowCount += 1
+          }
+
+          val stats = InternalRow.fromSeq(
+            columnBuilders.flatMap(_.columnStats.collectedStatistics).toSeq)
+          DefaultCachedBatch(rowCount, columnBuilders.map { builder =>
+            JavaUtils.bufferToArray(builder.build())
+          }, stats)
+        }
+
+        def hasNext: Boolean = rowIterator.hasNext
+      }
+    }
+  }
+
+  override def supportsColumnarOutput(schema: StructType): Boolean = schema.fields.forall(f =>
+    f.dataType match {
+        // More types can be supported, but this is to match the original implementation that
+        // only supported primitive types "for ease of review"

Review comment:
       indentation?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, Length, LessThan, LessThanOrEqual, Literal, Or, Predicate, StartsWith}
+import org.apache.spark.sql.execution.columnar.{ColumnStatisticsSchema, PartitionStatistics}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
+
+/**
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Can `convertForCacheColumnar()` be called instead of `convertForCache()` for this given
+   * schema? True if it can and false if it cannot. Columnar input is only supported if the
+   * plan could produce columnar output. Currently this is mostly supported by input formats
+   * like parquet and orc, but more operations are likely to be supported soon.
+   *
+   * @param schema the schema of the data being stored.
+   * @return True if columnar input can be supported, else false.
+   */
+  def supportsColumnarInput(schema: Seq[Attribute]): Boolean
+
+  /**
+   * Convert an `RDD[InternalRow]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCache(
+      input: RDD[InternalRow],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Convert an `RDD[ColumnarBatch]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * This will only be called if `supportsColumnarInput()` returned true for the given schema and
+   * the plan up to this point would could produce columnar output without modifying it.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCacheColumnar(
+      input: RDD[ColumnarBatch],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter batches prior to being decompressed.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide the filter logic
+   * necessary. You will need to provide metrics for this to work. [[SimpleMetricsCachedBatch]]
+   * provides the APIs to hold those metrics and explains the metrics used, really just min and max.
+   * Note that this is intended to skip batches that are not needed, and the actual filtering of
+   * individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be decompressed.
+   */
+  def buildFilter(
+      predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Can `decompressColumnar()` be called instead of `decompressToRows()` for this given
+   * schema? True if it can and false if it cannot. Columnar output is typically preferred
+   * because it is more efficient. Note that `decompressToRows()` must always be supported
+   * as there are other checks that can force row based output.
+   * @param schema the schema of the data being checked.
+   * @return true if columnar output should be used for this schema, else false.
+   */
+  def supportsColumnarOutput(schema: StructType): Boolean
+
+  /**
+   * The exact java types of the columns that are output in columnar processing mode. This

Review comment:
       Just a question. `The exact java types of the columns` means simply one of `OnHeapColumnVector` and `OffHeapColumnVector` here?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
##########
@@ -32,7 +32,7 @@ private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializabl
   val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes)
 }
 
-private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
+private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {

Review comment:
       Do we need this visibility change? For me, this is irrelevant to this PR. Can we remove this change from this PR? We had better make this change when it's inevitable.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
##########
@@ -32,7 +32,7 @@ private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializabl
   val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes)
 }
 
-private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
+private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {

Review comment:
       ~Do we need this visibility change? For me, this is irrelevant to this PR. Can we remove this change from this PR? We had better make this change when it's inevitable.~ Oh, got it. Sorry, I was confused.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
##########
@@ -32,7 +32,7 @@ private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializabl
   val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes)
 }
 
-private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
+private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {

Review comment:
       ```
   org.apache.spark.sql.columnar
   org.apache.spark.sql.execution.columnar
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
##########
@@ -32,7 +32,7 @@ private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializabl
   val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes)
 }
 
-private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
+private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {

Review comment:
       ~Do we need this visibility change? For me, this is irrelevant to this PR. Can we remove this change from this PR? We had better make this change when it's inevitable.~ Oh, got it. Sorry, I was confused between two `columnar` packages.
   
   ```
   org.apache.spark.sql.columnar
   org.apache.spark.sql.execution.columnar
   ```
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, Length, LessThan, LessThanOrEqual, Literal, Or, Predicate, StartsWith}
+import org.apache.spark.sql.execution.columnar.{ColumnStatisticsSchema, PartitionStatistics}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
+
+/**
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Can `convertForCacheColumnar()` be called instead of `convertForCache()` for this given
+   * schema? True if it can and false if it cannot. Columnar input is only supported if the
+   * plan could produce columnar output. Currently this is mostly supported by input formats
+   * like parquet and orc, but more operations are likely to be supported soon.
+   *
+   * @param schema the schema of the data being stored.
+   * @return True if columnar input can be supported, else false.
+   */
+  def supportsColumnarInput(schema: Seq[Attribute]): Boolean
+
+  /**
+   * Convert an `RDD[InternalRow]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCache(
+      input: RDD[InternalRow],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Convert an `RDD[ColumnarBatch]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * This will only be called if `supportsColumnarInput()` returned true for the given schema and
+   * the plan up to this point would could produce columnar output without modifying it.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCacheColumnar(

Review comment:
       Although I know this is designed to add `Columnar` postfix at the end of `convertForCache` function, can we use `convertForColumnarCache` instead? For me, it sounds like more natural.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, Length, LessThan, LessThanOrEqual, Literal, Or, Predicate, StartsWith}
+import org.apache.spark.sql.execution.columnar.{ColumnStatisticsSchema, PartitionStatistics}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
+
+/**
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Can `convertForCacheColumnar()` be called instead of `convertForCache()` for this given
+   * schema? True if it can and false if it cannot. Columnar input is only supported if the
+   * plan could produce columnar output. Currently this is mostly supported by input formats
+   * like parquet and orc, but more operations are likely to be supported soon.
+   *
+   * @param schema the schema of the data being stored.
+   * @return True if columnar input can be supported, else false.
+   */
+  def supportsColumnarInput(schema: Seq[Attribute]): Boolean
+
+  /**
+   * Convert an `RDD[InternalRow]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCache(
+      input: RDD[InternalRow],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch]
+
+  /**
+   * Convert an `RDD[ColumnarBatch]` into an `RDD[CachedBatch]` in preparation for caching the data.
+   * This will only be called if `supportsColumnarInput()` returned true for the given schema and
+   * the plan up to this point would could produce columnar output without modifying it.
+   * @param input the input `RDD` to be converted.
+   * @param schema the schema of the data being stored.
+   * @param storageLevel where the data will be stored.
+   * @param conf the config for the query.
+   * @return The data converted into a format more suitable for caching.
+   */
+  def convertForCacheColumnar(

Review comment:
       Although I know this is designed to add `Columnar` postfix at the end of `convertForCache` function, can we use `convertForColumnarCache` instead? For me, it sounds like more natural because we use `supportsColumnarInput` already.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org