You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Micah Kornfield <em...@gmail.com> on 2020/06/28 04:27:28 UTC

Contract for PartitionReader/InputPartition for ColumnarBatch?

Hello spark-dev,

Looking at ColumnarBatch [1] it seems to indicate a single object is meant
to be used for the entire loading process.

Does this imply that Spark assumes the ColumnarBatch and any direct
references to ColumnarBatch (e.g. UTF8Strings) returned by
InputPartitionReader/PartitionReader [2][3] get invalidated after "next()"
is called on the Reader?

Does the same apply for InternalRow?

Does it make sense to update the contracts one way or another (I'm happy to
make a PR).?

Thanks,
Micah

[1]
https://github.com/apache/spark/blob/c341de8b3e1f1d3327bd4ae3b0d2ec048f64d306/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
[2]
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
[3]
https://github.com/apache/spark/blob/a5efbb284e29b1d879490a4ee2c9fa08acec42b0/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java

Re: Contract for PartitionReader/InputPartition for ColumnarBatch?

Posted by Bobby Evans <bo...@apache.org>.
Micah,

You are correct. The contract for processing ColumnarBatches is that the
code that produced the batch is responsible for closing it and
anything downstream of it cannot keep any references to it. This is just
like with UnsafeRow.  If an UnsafeRow is cached, like for aggregates or
sorts, it must be copied into a separate memory buffer. This does not lend
itself to efficient memory management when doing columnar processing, but
for the intended purpose of loading columnar data and then instantly
turning it into rows, it works fine.

Any change to this contract would require performance testing.  This is
because several of the input formats are written to reuse the batch/memory
buffer. Spark is configured by default to keep the batch size small so that
the batch can fit in the CPU cache.  A change to the contract would
potentially mean a lot of object churn for GC to handle, or some possibly
complex code to do reference counting and memory reuse.

I personally would prefer to see this change because we are doing columnar
processing with a lot of transformations that we don't want to keep memory
statically allocated for. In our plugin, we have the consumer be
responsible for closing the incoming batch, but our batch sizes are a lot
larger so GC pressure is less of an issue. The only thing for us is that we
have to manage the transition between the spark columnar model and our
plugin's internal columnar model.  Not a big deal though.

Thanks,

Bobby

On Sat, Jun 27, 2020 at 11:28 PM Micah Kornfield <em...@gmail.com>
wrote:

> Hello spark-dev,
>
> Looking at ColumnarBatch [1] it seems to indicate a single object is meant
> to be used for the entire loading process.
>
> Does this imply that Spark assumes the ColumnarBatch and any direct
> references to ColumnarBatch (e.g. UTF8Strings) returned by
> InputPartitionReader/PartitionReader [2][3] get invalidated after "next()"
> is called on the Reader?
>
> Does the same apply for InternalRow?
>
> Does it make sense to update the contracts one way or another (I'm happy
> to make a PR).?
>
> Thanks,
> Micah
>
> [1]
> https://github.com/apache/spark/blob/c341de8b3e1f1d3327bd4ae3b0d2ec048f64d306/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
> [2]
> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
> [3]
> https://github.com/apache/spark/blob/a5efbb284e29b1d879490a4ee2c9fa08acec42b0/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java
>