You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "jiangyu (Jira)" <ji...@apache.org> on 2019/08/22 09:38:00 UTC
[jira] [Comment Edited] (SPARK-28482) Data incomplete when using
pandas udf in Python 3
[ https://issues.apache.org/jira/browse/SPARK-28482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16913183#comment-16913183 ]
jiangyu edited comment on SPARK-28482 at 8/22/19 9:37 AM:
----------------------------------------------------------
hi, [~bryanc] , maybe you should produce more data, like 100,000 rows, and read 10,000 rows every iteration. The number of the rows is not right, is smaller than expected.
I have investigate this issue this week, i find the row numbers is correct when arrow read from the socket , so in serializers.py , i revise the method of dump_stream, change the stream to local stream
{code:java}
// code placeholder
def dump_stream(self, iterator, stream):
"""
Make ArrowRecordBatches from Pandas Series and serialize. Input is a single series or
a list of series accompanied by an optional pyarrow type to coerce the data to.
"""
import pyarrow as pa
writer = None
local_stream = pa.output_stream('/tmp/output')
try:
for series in iterator:
batch = _create_batch(series, self._timezone)
if writer is None:
# write_int(SpecialLengths.START_ARROW_STREAM, stream)
# writer = pa.RecordBatchStreamWriter(stream, batch.schema)
write_int(SpecialLengths.START_ARROW_STREAM, local_stream)
writer = pa.RecordBatchStreamWriter(local_stream, batch.schema)
writer.write_batch(batch)
finally:
if writer is not None:
writer.close()
{code}
The row numbers is correct, and no exception throw.
Then i change the daemon.py , and increase the buffer size of outfile, from 65536 to 655360000.
{code:java}
// code placeholder
def worker(sock, authenticated):
"""
Called by a worker process after the fork().
"""
signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
# restore the handler for SIGINT,
# it's useful for debugging (show the stacktrace before exit)
signal.signal(SIGINT, signal.default_int_handler)
# Read the socket using fdopen instead of socket.makefile() because the latter
# seems to be very slow; note that we need to dup() the file descriptor because
# otherwise writes also cause a seek that makes us miss data on the read side.
infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
outfile = os.fdopen(os.dup(sock.fileno()), "wb", 655360000)
{code}
And everything is ok. I don't know if it is safe to increase buffer size to that high. But it really help us.
was (Author: jiangyu1211):
hi, [~bryanc] , maybe you should produce more data, like 100,000 rows, and read 10,000 rows every iteration. The number of the rows is not right, is smaller than expected.
I have investigate this issue this week, i find the row numbers is correct when arrow read from the socket , so in serializers.py , i revise the method of dump_stream, change the stream to local stream
{code:java}
// code placeholder
def dump_stream(self, iterator, stream):
"""
Make ArrowRecordBatches from Pandas Series and serialize. Input is a single series or
a list of series accompanied by an optional pyarrow type to coerce the data to.
"""
import pyarrow as pa
writer = None
local_stream = pa.output_stream('/tmp/output')
try:
for series in iterator:
batch = _create_batch(series, self._timezone)
if writer is None:
# write_int(SpecialLengths.START_ARROW_STREAM, stream)
# writer = pa.RecordBatchStreamWriter(stream, batch.schema)
write_int(SpecialLengths.START_ARROW_STREAM, local_stream)
writer = pa.RecordBatchStreamWriter(local_stream, batch.schema)
writer.write_batch(batch)
finally:
if writer is not None:
writer.close()
{code}
The row numbers is correct, and no exception throw.
Then i change the daemon.py , and increase the buffer size of outfile, from 65536 to 655360000.
{code:java}
// code placeholder
def worker(sock, authenticated):
"""
Called by a worker process after the fork().
"""
signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
# restore the handler for SIGINT,
# it's useful for debugging (show the stacktrace before exit)
signal.signal(SIGINT, signal.default_int_handler)
# Read the socket using fdopen instead of socket.makefile() because the latter
# seems to be very slow; note that we need to dup() the file descriptor because
# otherwise writes also cause a seek that makes us miss data on the read side.
infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
outfile = os.fdopen(os.dup(sock.fileno()), "wb", 655360000)
{code}
And everything is ok. So i don't know if it is safe to increase buffer size to this high. But it is really help us.
> Data incomplete when using pandas udf in Python 3
> -------------------------------------------------
>
> Key: SPARK-28482
> URL: https://issues.apache.org/jira/browse/SPARK-28482
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.3.3, 2.4.3
> Environment: centos 7.4
> pyarrow 0.10.0 0.14.0
> python 2.7 3.5 3.6
> Reporter: jiangyu
> Priority: Major
> Attachments: py2.7.png, py3.6.png, test.csv, test.py, worker.png
>
>
> Hi,
>
> Since Spark 2.3.x, pandas udf has been introduced as default ser/des method when using udf. However, an issue raises with python >= 3.5.x version.
> We use pandas udf to process batches of data, but we find the data is incomplete in python 3.x. At first , i think the process logical maybe wrong, so i change the code to very simple one and it has the same problem.After investigate for a week, i find it is related to pyarrow.
>
> *Reproduce procedure:*
> 1. prepare data
> The data have seven column, a、b、c、d、e、f and g, data type is Integer
> a,b,c,d,e,f,g
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
> produce 100,000 rows and name the file test.csv ,upload to hdfs, then load it , and repartition it to 1 partition.
>
> {code:java}
> df=spark.read.format('csv').option("header","true").load('/test.csv')
> df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
> df=df.repartition(1)
> spark_context = SparkContext.getOrCreate() {code}
>
> 2.register pandas udf
>
> {code:java}
> def add_func(a,b,c,d,e,f,g):
> print('iterator one time')
> return a
> add = pandas_udf(add_func, returnType=IntegerType())
> df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g"))){code}
>
> 3.apply pandas udf
>
> {code:java}
> def trigger_func(iterator):
> yield iterator
> df_result.rdd.foreachPartition(trigger_func){code}
>
> 4.execute it in pyspark (local or yarn)
> run it with conf spark.sql.execution.arrow.maxRecordsPerBatch=100000. As mentioned before the total row number is 1000000, it should print "iterator one time " 10 times.
> (1)Python 2.7 envs:
>
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf spark.sql.execution.arrow.maxRecordsPerBatch=100000 --conf spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true --executor-cores 1{code}
>
> !py2.7.png!
> The result is right, 10 times of print.
>
>
> (2)Python 3.5 or 3.6 envs:
> {code:java}
> PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf spark.sql.execution.arrow.maxRecordsPerBatch=100000 --conf spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true --executor-cores{code}
>
> !py3.6.png!
> The data is incomplete. Exception is print by jvm spark which have been added by us , I will explain it later.
>
>
> h3. *Investigation*
> The “process done” is added in the worker.py.
> !worker.png!
> In order to get the exception, change the spark code, the code is under core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to print the exception.
>
>
> {code:java}
> @@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
> case t: Throwable =>
> // Purposefully not using NonFatal, because even fatal exceptions
> // we don't want to have our finallyBlock suppress
> + logInfo(t.getLocalizedMessage)
> + t.printStackTrace()
> originalThrowable = t
> throw originalThrowable
> } finally {{code}
>
>
> It seems the pyspark get the data from jvm , but pyarrow get the data incomplete. Pyarrow side think the data is finished, then shutdown the socket. At the same time, the jvm side still writes to the same socket , but get socket close exception.
> The pyarrow part is in ipc.pxi:
>
> {code:java}
> cdef class _RecordBatchReader:
> cdef:
> shared_ptr[CRecordBatchReader] reader
> shared_ptr[InputStream] in_stream
> cdef readonly:
> Schema schema
> def _cinit_(self):
> pass
> def _open(self, source):
> get_input_stream(source, &self.in_stream)
> with nogil:
> check_status(CRecordBatchStreamReader.Open(
> self.in_stream.get(), &self.reader))
> self.schema = pyarrow_wrap_schema(self.reader.get().schema())
> def _iter_(self):
> while True:
> yield self.read_next_batch()
> def get_next_batch(self):
> import warnings
> warnings.warn('Please use read_next_batch instead of '
> 'get_next_batch', FutureWarning)
> return self.read_next_batch()
> def read_next_batch(self):
> """
> Read next RecordBatch from the stream. Raises StopIteration at end of
> stream
> """
> cdef shared_ptr[CRecordBatch] batch
> with nogil:
> check_status(self.reader.get().ReadNext(&batch))
> if batch.get() == NULL:
> raise StopIteration
> return pyarrow_wrap_batch(batch){code}
>
> read_next_batch function get NULL, think the iterator is over.
>
> h3. *RESULT*
> Our environment is spark 2.4.3, we have tried pyarrow version 0.10.0 and 0.14.0 , python version is python 2.7, python 3.5, python 3.6.
> When using python 2.7, everything is fine. But when change to python 3.5,3,6, the data is wrong.
> The column number is critical to trigger this bug, if column number is less than 5 , this bug probably will not happen. But If the column number is big , for example 7 or above, it happens every time.
> So we wonder if there is some conflict between python 3.x and pyarrow version?
> I have put the code and data as attachment.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org