You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "jiangyu (JIRA)" <ji...@apache.org> on 2019/07/23 08:13:00 UTC

[jira] [Updated] (ARROW-6011) Data incomplete when using pyarrow in pyspark in python 3.x

     [ https://issues.apache.org/jira/browse/ARROW-6011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

jiangyu updated ARROW-6011:
---------------------------
    Attachment: py3.6.png

> Data incomplete when using pyarrow in pyspark in python 3.x
> -----------------------------------------------------------
>
>                 Key: ARROW-6011
>                 URL: https://issues.apache.org/jira/browse/ARROW-6011
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Java, Python
>    Affects Versions: 0.10.0, 0.14.0
>         Environment: ceonts 7.4      pyarrow 0.10.0  0.14.0   python 2.7  3.5 3.6
>            Reporter: jiangyu
>            Priority: Major
>         Attachments: image-2019-07-23-16-06-49-889.png, py3.6.png
>
>
> Hi,
>  
> In spark 2.3, pandas udf add to pyspark and pyarrow as a default serialization and deserialization method. It is a great feature, and we use it a lot.
> But , when we change the default python version from 2.7 to 3.5 or 3.6 ( conda as  python envs manager),  we encounter a fatal problem.
> We use pandas udf to process batches of data, but we find the data is incompelted. At first , i think the process logical maybe wrong, so i change the code to very simple one and it has the same problem.After investigate for a week, i find it is related to pyarrow.   
>  
> Reproduce it:
>  
> Below is how to reproduce it:
> 1.generate data
> first generate a very simple data, the data have seven column, a、b、c、d、e、f and g, every row is the same,data type is Integer
> a,b,c,d,e,f,g
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
> 1,2,3,4,5,6,7
>  we can produce 100,000 rows and name the file test.csv upload to hdfs, then load it , and repartition it to 1 partition.
>  
> df=spark.read.format('csv').option("header","true").load('/test.csv')
> df=df.select(*(col(c).cast("int").alias(c) for c in df.columns))
> df=df.repartition(1)
> spark_context = SparkContext.getOrCreate() 
>  
> 2.register pandas udf
> make a very simple pandas udf function and register it.The function is very simple , just print “iterator one time” and do nothing then return.
>  
> def add_func(a,b,c,d,e,f,g):
>      print('iterator one time')
>      return a
> add = pandas_udf(add_func, returnType=IntegerType())
> df_result=df.select(add(col("a"),col("b"),col("c"),col("d"),col("e"),col("f"),col("g")))
>  
> 3.trigger spark to action
>  
> def trigger_func(iterator):
>      yield iterator
> df_result.rdd.foreachPartition(trigger_func)
>  
> 4.execute it in pyspark (local or yarn)
> we set spark.sql.execution.arrow.maxRecordsPerBatch=100000, and the rows is 1,000,000 , so it is should print “iterator one time” for 10 times.
> (1)Here is result in python 2.7 envs:
>  
> PYSPARK_PYTHON=/usr/lib/conda/envs/py2.7/bin/python pyspark --conf spark.sql.execution.arrow.maxRecordsPerBatch=100000 --conf spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true --executor-cores 1
>  
> !image-2019-07-23-16-06-49-889.png!  
> The result is right, 10 times of print.
> (2)Then change to python 3.6 envs,with the same code.
> PYSPARK_PYTHON=/usr/lib/conda/envs/python3.6/bin/python pyspark --conf spark.sql.execution.arrow.maxRecordsPerBatch=100000 --conf spark.executor.pyspark.memory=2g --conf spark.sql.execution.arrow.enabled=true --executor-cores 1!0pPjMgKKgEJSEACEpCABCQgAQlIQAISkMCcCaismHPrWjcJSEACEpCABCQgAQlIQAISkMAJJKCy4gQ2miJLQAISkIAEJCABCUhAAhKQgATmTEBlxZxb17pJQAISkIAEJCABCUhAAhKQgAROIAGVFSew0RRZAhKQgAQkIAEJSEACEpCABCQwZwIqK bcutZNAhKQgAQkIAEJSEACEpCABCRwAgmorDiBjabIEpCABCQgAQlIQAISkIAEJCCBORP4B5QvwTqM1wfyAAAAAElFTkSuQmCC! The data is incomplete. 
> The exception is print by spark which have been added by us , I will explain it later.
>  
>  
> h3. Investigation
> So i just add some log to trace it. The “process done” is added in the worker.py.
> !Ae0YTBna66oMAAAAAElFTkSuQmCC!  
> In order to get the exception, we also change the spark code, the code is under core/src/main/scala/org/apache/spark/util/Utils.scala , and add this code to print the exception.
>  
> @@ -1362,6 +1362,8 @@ private[spark] object Utils extends Logging {
>  case t: Throwable =>
>  // Purposefully not using NonFatal, because even fatal exceptions
>  // we don't want to have our finallyBlock suppress
> + logInfo(t.getLocalizedMessage)
> + t.printStackTrace()
>  originalThrowable = t
>  throw originalThrowable
>  } finally {
>  
> It seems the pyspark get the data from jvm , but pyarrow get the data incomplete. Pyarrow side think the data is finished, then shutdown the socket. At the same time, the jvm side still writes to the same socket , but get socket close exception.
> The pyarrow part is in ipc.pxi:
>  
> cdef class _RecordBatchReader:
>  cdef:
>  shared_ptr[CRecordBatchReader] reader
>  shared_ptr[InputStream] in_stream
>  cdef readonly:
>  Schema schema
>  def __cinit__(self):
>  pass
>  def _open(self, source):
>  get_input_stream(source, &self.in_stream)
>  with nogil:
>  check_status(CRecordBatchStreamReader.Open(
>  self.in_stream.get(), &self.reader))
>  self.schema = pyarrow_wrap_schema(self.reader.get().schema())
>  def __iter__(self):
>  while True:
>  yield self.read_next_batch()
>  def get_next_batch(self):
>  import warnings
>  warnings.warn('Please use read_next_batch instead of '
>  'get_next_batch', FutureWarning)
>  return self.read_next_batch()
>  def read_next_batch(self):
>  """
>  Read next RecordBatch from the stream. Raises StopIteration at end of
>  stream
>  """
>  cdef shared_ptr[CRecordBatch] batch
>  with nogil:
>  check_status(self.reader.get().ReadNext(&batch))
>  if batch.get() == NULL:
>  raise StopIteration
>  return pyarrow_wrap_batch(batch)read_next_batch function get NULL, think the iterator is over.
>  
> h3. RESULT
> Our environment is spark 2.4.3, we have tried pyarrow version 0.10.0 and 0.14.0 , python version is python 2.7, python 3.5, python 3.6.
> When using python 2.7, everything is fine. But when change to python 3.5,3,6, the data is wrong.
> The column number is critical to trigger this bug, if column number is less than 5 , this bug probably will not happen. But If the column number is big , for example 7 or above, it will happened every time.
> So we wonder if there is some conflict between python 3.x and pyarrow version? 
> I have put the code and data as attachment.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)