You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/10 20:15:48 UTC

[GitHub] [spark] tgravescs commented on a change in pull request #29067: [SPARK-32274] Make SQL cache serialization pluggable

tgravescs commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r453009530



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
##########
@@ -79,46 +77,14 @@ case class InMemoryTableScanExec(
     }) && !WholeStageCodegenExec.isTooManyFields(conf, relation.schema)
   }
 
-  private val columnIndices =
-    attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray
-
-  private val relationSchema = relation.schema.toArray
-
-  private lazy val columnarBatchSchema = new StructType(columnIndices.map(i => relationSchema(i)))
-
-  private def createAndDecompressColumn(
-      cachedColumnarBatch: CachedBatch,
-      offHeapColumnVectorEnabled: Boolean): ColumnarBatch = {
-    val rowCount = cachedColumnarBatch.numRows
-    val taskContext = Option(TaskContext.get())
-    val columnVectors = if (!offHeapColumnVectorEnabled || taskContext.isEmpty) {
-      OnHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
-    } else {
-      OffHeapColumnVector.allocateColumns(rowCount, columnarBatchSchema)
-    }
-    val columnarBatch = new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]])
-    columnarBatch.setNumRows(rowCount)
-
-    for (i <- attributes.indices) {
-      ColumnAccessor.decompress(
-        cachedColumnarBatch.buffers(columnIndices(i)),
-        columnarBatch.column(i).asInstanceOf[WritableColumnVector],
-        columnarBatchSchema.fields(i).dataType, rowCount)
-    }
-    taskContext.foreach(_.addTaskCompletionListener[Unit](_ => columnarBatch.close()))
-    columnarBatch
-  }
-
   private lazy val columnarInputRDD: RDD[ColumnarBatch] = {
     val numOutputRows = longMetric("numOutputRows")
     val buffers = filteredCachedBatches()
-    val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled
-    buffers
-      .map(createAndDecompressColumn(_, offHeapColumnVectorEnabled))
-      .map { buffer =>
-        numOutputRows += buffer.numRows()
-        buffer
-      }
+    relation.cacheBuilder.serializer.decompressColumnar(buffers, relation.output, attributes, conf)
+        .map { cb =>

Review comment:
       nit, 2 space indention

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,84 +19,288 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
 
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
 
 /**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Run the given plan and convert its output to a implementation of [[CachedBatch]].
+   * @param cachedPlan the plan to run.
+   * @return the RDD containing the batches of data to cache.
+   */
+  def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter which batches are loaded.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide what
+   * you need with the added expense of calculating the min and max value for some
+   * data columns, depending on their data type. Note that this is intended to skip batches
+   * that are not needed, and the actual filtering of individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be loaded.
+   */
+  def buildFilter(predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Decompress the cached data into a ColumnarBatch. This currently is only used for basic types
+   * BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType
+   * That may change in the future.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the batches in the ColumnarBatch format.
+   */
+  def decompressColumnar(input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch]
+
+  /**
+   * Decompress the cached into back into [[InternalRow]]. If you want this to be performant code

Review comment:
       nit: the cached batch back into
   also I think adding a , after performant.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,84 +19,288 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
 
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
 
 /**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+trait CachedBatchSerializer extends Serializable {

Review comment:
       we should mark this with @DeveloperApi
   We should also add in the @Since("3.1.0")
   should add to all the public interfaces you added

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,84 +19,288 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
 
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
 
 /**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Run the given plan and convert its output to a implementation of [[CachedBatch]].
+   * @param cachedPlan the plan to run.
+   * @return the RDD containing the batches of data to cache.
+   */
+  def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter which batches are loaded.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide what
+   * you need with the added expense of calculating the min and max value for some
+   * data columns, depending on their data type. Note that this is intended to skip batches
+   * that are not needed, and the actual filtering of individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be loaded.
+   */
+  def buildFilter(predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Decompress the cached data into a ColumnarBatch. This currently is only used for basic types
+   * BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType
+   * That may change in the future.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the batches in the ColumnarBatch format.
+   */
+  def decompressColumnar(input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch]
+
+  /**
+   * Decompress the cached into back into [[InternalRow]]. If you want this to be performant code
+   * generation is advised.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the rows that were stored in the cached batches.
+   */
+  def decompressToRows(input: RDD[CachedBatch],

Review comment:
       put input on next line

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,84 +19,288 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
 
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
 
 /**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Run the given plan and convert its output to a implementation of [[CachedBatch]].
+   * @param cachedPlan the plan to run.
+   * @return the RDD containing the batches of data to cache.
+   */
+  def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter which batches are loaded.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide what
+   * you need with the added expense of calculating the min and max value for some
+   * data columns, depending on their data type. Note that this is intended to skip batches
+   * that are not needed, and the actual filtering of individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be loaded.
+   */
+  def buildFilter(predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Decompress the cached data into a ColumnarBatch. This currently is only used for basic types
+   * BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType
+   * That may change in the future.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the batches in the ColumnarBatch format.
+   */
+  def decompressColumnar(input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch]
+
+  /**
+   * Decompress the cached into back into [[InternalRow]]. If you want this to be performant code
+   * generation is advised.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the rows that were stored in the cached batches.
+   */
+  def decompressToRows(input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[InternalRow]
+}
 
-case class CachedRDDBuilder(
-    useCompression: Boolean,
-    batchSize: Int,
-    storageLevel: StorageLevel,
-    @transient cachedPlan: SparkPlan,
-    tableName: Option[String]) {
+/**
+ * A [[CachedBatch]] that stored some simple metrics that can be used for filtering of batches with
+ * the [[SimpleMetricsCachedBatchSerializer]].
+ */
+trait SimpleMetricsCachedBatch extends CachedBatch {
+  /**
+   * Holds the same as ColumnStats.
+   * upperBound (optional), lowerBound (Optional), nullCount: Int, rowCount: Int, sizeInBytes: Long
+   */
+  val stats: InternalRow
+  override def sizeInBytes: Long = stats.getLong(4)
+}
 
-  @transient @volatile private var _cachedColumnBuffers: RDD[CachedBatch] = null
+// Currently, only use statistics from atomic types except binary type only.
+private object ExtractableLiteral {
+  def unapply(expr: Expression): Option[Literal] = expr match {
+    case lit: Literal => lit.dataType match {
+      case BinaryType => None
+      case _: AtomicType => Some(lit)
+      case _ => None
+    }
+    case _ => None
+  }
+}
 
-  val sizeInBytesStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator
-  val rowCountStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator
+/**
+ * Provides basic filtering for [[CachedBatchSerializer]] implementations.
+ */
+trait SimpleMetricsCachedBatchSerializer extends CachedBatchSerializer with Logging {
+  override def buildFilter(predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]):
+  (Int, Iterator[CachedBatch]) => Iterator[CachedBatch] = {

Review comment:
       nit indentation off

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,84 +19,288 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
 
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
 
 /**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Run the given plan and convert its output to a implementation of [[CachedBatch]].
+   * @param cachedPlan the plan to run.
+   * @return the RDD containing the batches of data to cache.
+   */
+  def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter which batches are loaded.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide what
+   * you need with the added expense of calculating the min and max value for some
+   * data columns, depending on their data type. Note that this is intended to skip batches
+   * that are not needed, and the actual filtering of individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be loaded.
+   */
+  def buildFilter(predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Decompress the cached data into a ColumnarBatch. This currently is only used for basic types
+   * BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType
+   * That may change in the future.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the batches in the ColumnarBatch format.
+   */
+  def decompressColumnar(input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch]
+
+  /**
+   * Decompress the cached into back into [[InternalRow]]. If you want this to be performant code
+   * generation is advised.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the rows that were stored in the cached batches.
+   */
+  def decompressToRows(input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[InternalRow]
+}
 
-case class CachedRDDBuilder(
-    useCompression: Boolean,
-    batchSize: Int,
-    storageLevel: StorageLevel,
-    @transient cachedPlan: SparkPlan,
-    tableName: Option[String]) {
+/**
+ * A [[CachedBatch]] that stored some simple metrics that can be used for filtering of batches with
+ * the [[SimpleMetricsCachedBatchSerializer]].
+ */
+trait SimpleMetricsCachedBatch extends CachedBatch {
+  /**
+   * Holds the same as ColumnStats.
+   * upperBound (optional), lowerBound (Optional), nullCount: Int, rowCount: Int, sizeInBytes: Long
+   */
+  val stats: InternalRow
+  override def sizeInBytes: Long = stats.getLong(4)
+}
 
-  @transient @volatile private var _cachedColumnBuffers: RDD[CachedBatch] = null
+// Currently, only use statistics from atomic types except binary type only.
+private object ExtractableLiteral {
+  def unapply(expr: Expression): Option[Literal] = expr match {
+    case lit: Literal => lit.dataType match {
+      case BinaryType => None
+      case _: AtomicType => Some(lit)
+      case _ => None
+    }
+    case _ => None
+  }
+}
 
-  val sizeInBytesStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator
-  val rowCountStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator
+/**
+ * Provides basic filtering for [[CachedBatchSerializer]] implementations.
+ */
+trait SimpleMetricsCachedBatchSerializer extends CachedBatchSerializer with Logging {
+  override def buildFilter(predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]):
+  (Int, Iterator[CachedBatch]) => Iterator[CachedBatch] = {
+    val stats = new PartitionStatistics(cachedAttributes)
+    val statsSchema = stats.schema
+
+    def statsFor(a: Attribute): ColumnStatisticsSchema = {
+      stats.forAttribute(a)
+    }
 
-  val cachedName = tableName.map(n => s"In-memory table $n")
-    .getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024))
+    // Returned filter predicate should return false iff it is impossible for the input expression
+    // to evaluate to `true` based on statistics collected about this partition batch.
+    @transient lazy val buildFilter: PartialFunction[Expression, Expression] = {
+      case And(lhs: Expression, rhs: Expression)
+        if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
+        (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _)
+
+      case Or(lhs: Expression, rhs: Expression)
+        if buildFilter.isDefinedAt(lhs) && buildFilter.isDefinedAt(rhs) =>
+        buildFilter(lhs) || buildFilter(rhs)
+
+      case EqualTo(a: AttributeReference, ExtractableLiteral(l)) =>
+        statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound
+      case EqualTo(ExtractableLiteral(l), a: AttributeReference) =>
+        statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound
+
+      case EqualNullSafe(a: AttributeReference, ExtractableLiteral(l)) =>
+        statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound
+      case EqualNullSafe(ExtractableLiteral(l), a: AttributeReference) =>
+        statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound
+
+      case LessThan(a: AttributeReference, ExtractableLiteral(l)) => statsFor(a).lowerBound < l
+      case LessThan(ExtractableLiteral(l), a: AttributeReference) => l < statsFor(a).upperBound
+
+      case LessThanOrEqual(a: AttributeReference, ExtractableLiteral(l)) =>
+        statsFor(a).lowerBound <= l
+      case LessThanOrEqual(ExtractableLiteral(l), a: AttributeReference) =>
+        l <= statsFor(a).upperBound
+
+      case GreaterThan(a: AttributeReference, ExtractableLiteral(l)) => l < statsFor(a).upperBound
+      case GreaterThan(ExtractableLiteral(l), a: AttributeReference) => statsFor(a).lowerBound < l
+
+      case GreaterThanOrEqual(a: AttributeReference, ExtractableLiteral(l)) =>
+        l <= statsFor(a).upperBound
+      case GreaterThanOrEqual(ExtractableLiteral(l), a: AttributeReference) =>
+        statsFor(a).lowerBound <= l
+
+      case IsNull(a: Attribute) => statsFor(a).nullCount > 0
+      case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0
+
+      case In(a: AttributeReference, list: Seq[Expression])
+        if list.forall(ExtractableLiteral.unapply(_).isDefined) && list.nonEmpty =>
+        list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] &&
+            l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _)
+      // This is an example to explain how it works, imagine that the id column stored as follows:
+      // __________________________________________
+      // | Partition ID | lowerBound | upperBound |
+      // |--------------|------------|------------|
+      // |      p1      |    '1'     |    '9'     |
+      // |      p2      |    '10'    |    '19'    |
+      // |      p3      |    '20'    |    '29'    |
+      // |      p4      |    '30'    |    '39'    |
+      // |      p5      |    '40'    |    '49'    |
+      // |______________|____________|____________|
+      //
+      // A filter: df.filter($"id".startsWith("2")).
+      // In this case it substr lowerBound and upperBound:
+      // ________________________________________________________________________________________
+      // | Partition ID | lowerBound.substr(0, Length("2")) | upperBound.substr(0, Length("2")) |
+      // |--------------|-----------------------------------|-----------------------------------|
+      // |      p1      |    '1'                            |    '9'                            |
+      // |      p2      |    '1'                            |    '1'                            |
+      // |      p3      |    '2'                            |    '2'                            |
+      // |      p4      |    '3'                            |    '3'                            |
+      // |      p5      |    '4'                            |    '4'                            |
+      // |______________|___________________________________|___________________________________|
+      //
+      // We can see that we only need to read p1 and p3.
+      case StartsWith(a: AttributeReference, ExtractableLiteral(l)) =>
+        statsFor(a).lowerBound.substr(0, Length(l)) <= l &&
+            l <= statsFor(a).upperBound.substr(0, Length(l))
+    }
 
-  def cachedColumnBuffers: RDD[CachedBatch] = {
-    if (_cachedColumnBuffers == null) {
-      synchronized {
-        if (_cachedColumnBuffers == null) {
-          _cachedColumnBuffers = buildBuffers()
-        }
+    // When we bind the filters we need to do it against the stats schema
+    val partitionFilters: Seq[Expression] = {
+      predicates.flatMap { p =>
+        val filter = buildFilter.lift(p)
+        val boundFilter =
+          filter.map(
+            BindReferences.bindReference(
+              _,
+              statsSchema,
+              allowFailures = true))
+
+        boundFilter.foreach(_ =>
+          filter.foreach(f => logInfo(s"Predicate $p generates partition filter: $f")))
+
+        // If the filter can't be resolved then we are missing required statistics.
+        boundFilter.filter(_.resolved)
       }
     }
-    _cachedColumnBuffers
-  }
 
-  def clearCache(blocking: Boolean = false): Unit = {
-    if (_cachedColumnBuffers != null) {
-      synchronized {
-        if (_cachedColumnBuffers != null) {
-          _cachedColumnBuffers.unpersist(blocking)
-          _cachedColumnBuffers = null
+    def ret(index: Int, cachedBatchIterator: Iterator[CachedBatch]): Iterator[CachedBatch] = {
+      val partitionFilter = Predicate.create(
+        partitionFilters.reduceOption(And).getOrElse(Literal(true)),
+        cachedAttributes)
+
+      partitionFilter.initialize(index)
+      val schemaIndex = cachedAttributes.zipWithIndex
+
+      cachedBatchIterator.filter { cb =>
+        val cachedBatch = cb.asInstanceOf[SimpleMetricsCachedBatch]
+        if (!partitionFilter.eval(cachedBatch.stats)) {
+          logDebug {
+            val statsString = schemaIndex.map { case (a, i) =>
+              val value = cachedBatch.stats.get(i, a.dataType)
+              s"${a.name}: $value"
+            }.mkString(", ")
+            s"Skipping partition based on stats $statsString"
+          }
+          false
+        } else {
+          true
         }
       }
     }
+

Review comment:
       remove extra newline

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,84 +19,288 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
 
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
 
 /**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Run the given plan and convert its output to a implementation of [[CachedBatch]].
+   * @param cachedPlan the plan to run.
+   * @return the RDD containing the batches of data to cache.
+   */
+  def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter which batches are loaded.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide what
+   * you need with the added expense of calculating the min and max value for some
+   * data columns, depending on their data type. Note that this is intended to skip batches
+   * that are not needed, and the actual filtering of individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be loaded.
+   */
+  def buildFilter(predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Decompress the cached data into a ColumnarBatch. This currently is only used for basic types
+   * BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType
+   * That may change in the future.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the batches in the ColumnarBatch format.

Review comment:
       nit: return RDD... 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,84 +19,288 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
 
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
 
 /**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Run the given plan and convert its output to a implementation of [[CachedBatch]].
+   * @param cachedPlan the plan to run.
+   * @return the RDD containing the batches of data to cache.
+   */
+  def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter which batches are loaded.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide what
+   * you need with the added expense of calculating the min and max value for some
+   * data columns, depending on their data type. Note that this is intended to skip batches
+   * that are not needed, and the actual filtering of individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be loaded.
+   */
+  def buildFilter(predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Decompress the cached data into a ColumnarBatch. This currently is only used for basic types
+   * BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType
+   * That may change in the future.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the batches in the ColumnarBatch format.
+   */
+  def decompressColumnar(input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch]
+
+  /**
+   * Decompress the cached into back into [[InternalRow]]. If you want this to be performant code
+   * generation is advised.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the rows that were stored in the cached batches.
+   */
+  def decompressToRows(input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[InternalRow]
+}
 
-case class CachedRDDBuilder(
-    useCompression: Boolean,
-    batchSize: Int,
-    storageLevel: StorageLevel,
-    @transient cachedPlan: SparkPlan,
-    tableName: Option[String]) {
+/**
+ * A [[CachedBatch]] that stored some simple metrics that can be used for filtering of batches with

Review comment:
       nit s/stored/stores/

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,84 +19,288 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
 
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
 
 /**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Run the given plan and convert its output to a implementation of [[CachedBatch]].
+   * @param cachedPlan the plan to run.
+   * @return the RDD containing the batches of data to cache.
+   */
+  def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter which batches are loaded.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide what
+   * you need with the added expense of calculating the min and max value for some
+   * data columns, depending on their data type. Note that this is intended to skip batches
+   * that are not needed, and the actual filtering of individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be loaded.
+   */
+  def buildFilter(predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Decompress the cached data into a ColumnarBatch. This currently is only used for basic types
+   * BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType
+   * That may change in the future.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the batches in the ColumnarBatch format.
+   */
+  def decompressColumnar(input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch]
+
+  /**
+   * Decompress the cached into back into [[InternalRow]]. If you want this to be performant code
+   * generation is advised.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the rows that were stored in the cached batches.

Review comment:
       return RDD of 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -119,35 +323,168 @@ case class CachedRDDBuilder(
             rowCount += 1
           }
 
-          sizeInBytesStats.add(totalSize)
-          rowCountStats.add(rowCount)
-
           val stats = InternalRow.fromSeq(
             columnBuilders.flatMap(_.columnStats.collectedStatistics))
-          CachedBatch(rowCount, columnBuilders.map { builder =>
+          DefaultCachedBatch(rowCount, columnBuilders.map { builder =>
             JavaUtils.bufferToArray(builder.build())
           }, stats)
         }
 
         def hasNext: Boolean = rowIterator.hasNext
       }
-    }.persist(storageLevel)
+    }
+  }
+
+  override def decompressColumnar(input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch] = {
+

Review comment:
       remove extra newline

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -119,35 +323,168 @@ case class CachedRDDBuilder(
             rowCount += 1
           }
 
-          sizeInBytesStats.add(totalSize)
-          rowCountStats.add(rowCount)
-
           val stats = InternalRow.fromSeq(
             columnBuilders.flatMap(_.columnStats.collectedStatistics))
-          CachedBatch(rowCount, columnBuilders.map { builder =>
+          DefaultCachedBatch(rowCount, columnBuilders.map { builder =>
             JavaUtils.bufferToArray(builder.build())
           }, stats)
         }
 
         def hasNext: Boolean = rowIterator.hasNext
       }
-    }.persist(storageLevel)
+    }
+  }
+
+  override def decompressColumnar(input: RDD[CachedBatch],

Review comment:
       input param on next line

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,84 +19,288 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
 
+/**
+ * Basic interface that all cached batches of data must support. This is primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a standard way.
+ */
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
 
 /**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
+ * Provides APIs for compressing, filtering, and decompressing SQL data that will be
+ * persisted/cached.
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Run the given plan and convert its output to a implementation of [[CachedBatch]].
+   * @param cachedPlan the plan to run.
+   * @return the RDD containing the batches of data to cache.
+   */
+  def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter which batches are loaded.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide what
+   * you need with the added expense of calculating the min and max value for some
+   * data columns, depending on their data type. Note that this is intended to skip batches
+   * that are not needed, and the actual filtering of individual rows is handled later.
+   * @param predicates the set of expressions to use for filtering.
+   * @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
+   *                         if you don't store it with the data.
+   * @return a function that takes the partition id and the iterator of batches in the partition.
+   *         It returns an iterator of batches that should be loaded.
+   */
+  def buildFilter(predicates: Seq[Expression],
+      cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]
+
+  /**
+   * Decompress the cached data into a ColumnarBatch. This currently is only used for basic types
+   * BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType
+   * That may change in the future.
+   * @param input the cached batches that should be decompressed.
+   * @param cacheAttributes the attributes of the data in the batch.
+   * @param selectedAttributes the field that should be loaded from the data, and the order they
+   *                           should appear in the output batch.
+   * @param conf the configuration for the job.
+   * @return the batches in the ColumnarBatch format.
+   */
+  def decompressColumnar(input: RDD[CachedBatch],

Review comment:
       nit: put input on the next line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org