You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tanveer (Jira)" <ji...@apache.org> on 2020/08/12 10:24:00 UTC

[jira] [Created] (SPARK-32601) Issue in converting an RDD of Arrow RecordBatches in v3.0.0

Tanveer created SPARK-32601:
-------------------------------

             Summary: Issue in converting an RDD of Arrow RecordBatches in v3.0.0
                 Key: SPARK-32601
                 URL: https://issues.apache.org/jira/browse/SPARK-32601
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.0.0
            Reporter: Tanveer
             Fix For: 2.3.4


The following simple code snippet for converting an RDD of Arrow RecordBatches works perfectly in Spark v2.3.4.

 
{code:java}
// code placeholder
from pyspark.sql import SparkSession
import pyspark
import pyarrow as pa
from pyspark.serializers import ArrowSerializer

def _arrow_record_batch_dumps(rb):
    # Fix for interoperability between pyarrow version >=0.15 and Spark's arrow version
    # Streaming message protocol has changed, remove setting when upgrading spark.
    import os
    os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1'
    
    return bytearray(ArrowSerializer().dumps(rb))


def rb_return(ardd):
    data = [
        pa.array(range(5), type='int16'),
        pa.array([-10, -5, 0, None, 10], type='int32')
    ]
    schema = pa.schema([pa.field('c0', pa.int16()),
                        pa.field('c1', pa.int32())],
                       metadata={b'foo': b'bar'})
    return pa.RecordBatch.from_arrays(data, schema=schema)

if __name__ == '__main__':
    spark = SparkSession \
        .builder \
        .appName("Python Arrow-in-Spark example") \
        .getOrCreate()

    # Enable Arrow-based columnar data transfers
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    sc = spark.sparkContext

    ardd = spark.sparkContext.parallelize([0,1,2], 3)
    ardd = ardd.map(rb_return)

    from pyspark.sql.types import from_arrow_schema
    from pyspark.sql.dataframe import DataFrame
    from pyspark.serializers import ArrowSerializer, PickleSerializer, AutoBatchedSerializer

    # Filter out and cache arrow record batches 
    ardd = ardd.filter(lambda x: isinstance(x, pa.RecordBatch)).cache()

    ardd = ardd.map(_arrow_record_batch_dumps)

    schema = pa.schema([pa.field('c0', pa.int16()),
                        pa.field('c1', pa.int32())],
                       metadata={b'foo': b'bar'})
    schema = from_arrow_schema(schema)

    jrdd = ardd._to_java_object_rdd()
    jdf = spark._jvm.PythonSQLUtils.arrowPayloadToDataFrame(jrdd, schema.json(), spark._wrapped._jsqlContext)
    df = DataFrame(jdf, spark._wrapped)
    df._schema = schema

    df.show()
{code}
 

But after updating to Spark to v3.0.0, the same functionality with just changing  arrowPayloadToDataFrame() -> toDataFrame() doesn't work.

 
{code:java}
// code placeholder

from pyspark.sql import SparkSession
import pyspark
import pyarrow as pa
#from pyspark.serializers import ArrowSerializerdef dumps(batch):
    import pyarrow as pa
    import io
    sink = io.BytesIO()
    writer = pa.RecordBatchFileWriter(sink, batch.schema)
    writer.write_batch(batch)
    writer.close()
    return sink.getvalue()def _arrow_record_batch_dumps(rb):
    # Fix for interoperability between pyarrow version >=0.15 and Spark's arrow version
    # Streaming message protocol has changed, remove setting when upgrading spark.
    #import os
    #os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1'    #return bytearray(ArrowSerializer().dumps(rb))
    return bytearray(dumps(rb))

def rb_return(ardd):
    data = [
        pa.array(range(5), type='int16'),
        pa.array([-10, -5, 0, None, 10], type='int32')
    ]
    schema = pa.schema([pa.field('c0', pa.int16()),
                        pa.field('c1', pa.int32())],
                       metadata={b'foo': b'bar'})
    return pa.RecordBatch.from_arrays(data, schema=schema)if __name__ == '__main__':
    spark = SparkSession \
        .builder \
        .appName("Python Arrow-in-Spark example") \
        .getOrCreate()    # Enable Arrow-based columnar data transfers
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    sc = spark.sparkContext    ardd = spark.sparkContext.parallelize([0,1,2], 3)
    ardd = ardd.map(rb_return)    from pyspark.sql.pandas.types import from_arrow_schema
    from pyspark.sql.dataframe import DataFrame    # Filter out and cache arrow record batches 
    ardd = ardd.filter(lambda x: isinstance(x, pa.RecordBatch)).cache()    ardd = ardd.map(_arrow_record_batch_dumps)    schema = pa.schema([pa.field('c0', pa.int16()),
                        pa.field('c1', pa.int32())],
                       metadata={b'foo': b'bar'})
    schema = from_arrow_schema(schema)    jrdd = ardd._to_java_object_rdd()
    #jdf = spark._jvm.PythonSQLUtils.arrowPayloadToDataFrame(jrdd, schema.json(), spark._wrapped._jsqlContext)
    jdf = spark._jvm.PythonSQLUtils.toDataFrame(jrdd, schema.json(), spark._wrapped._jsqlContext)
    df = DataFrame(jdf, spark._wrapped)
    df._schema = schema    df.show(){code}
First it gives error for Heap:
{code:java}
// code placeholder
20/08/12 12:18:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)20/08/12 12:18:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669) at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:336) at org.apache.spark.sql.execution.arrow.ArrowConverters$.loadBatch(ArrowConverters.scala:189) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.nextBatch(ArrowConverters.scala:165) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.<init>(ArrowConverters.scala:144) at org.apache.spark.sql.execution.arrow.ArrowConverters$.fromBatchIterator(ArrowConverters.scala:143) at org.apache.spark.sql.execution.arrow.ArrowConverters$.$anonfun$toDataFrame$1(ArrowConverters.scala:203) at org.apache.spark.sql.execution.arrow.ArrowConverters$$$Lambda$1806/1325557847.apply(Unknown Source) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837) at org.apache.spark.rdd.RDD$$Lambda$1805/889467051.apply(Unknown Source) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.executor.Executor$TaskRunner$$Lambda$1773/1728811302.apply(Unknown Source) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)20/08/12 12:18:48 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 0,5,main]java.lang.OutOfMemoryError: Java heap space
{code}
And when using parameter --driver-memory 4g for this very small data, it gives:
{code:java}
20/08/12 12:22:18 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)20/08/12 12:22:18 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)java.io.IOException: Unexpected end of stream trying to read message. at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:671) at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:336) at org.apache.spark.sql.execution.arrow.ArrowConverters$.loadBatch(ArrowConverters.scala:189) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.nextBatch(ArrowConverters.scala:165) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.<init>(ArrowConverters.scala:144) at org.apache.spark.sql.execution.arrow.ArrowConverters$.fromBatchIterator(ArrowConverters.scala:143) at org.apache.spark.sql.execution.arrow.ArrowConverters$.$anonfun$toDataFrame$1(ArrowConverters.scala:203) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)20/08/12 12:22:18 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, int2.bullx, executor driver): java.io.IOException: Unexpected end of stream trying to read message.
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org