You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Anton Kholodkov (Jira)" <ji...@apache.org> on 2021/09/21 11:39:00 UTC

[jira] [Commented] (SPARK-36600) reduces memory consumption win Pyspark CreateDataFrame

    [ https://issues.apache.org/jira/browse/SPARK-36600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17418052#comment-17418052 ] 

Anton Kholodkov commented on SPARK-36600:
-----------------------------------------

Hello! I would like to help with the task. Please assign it to me, if possible 

> reduces memory consumption win Pyspark CreateDataFrame
> ------------------------------------------------------
>
>                 Key: SPARK-36600
>                 URL: https://issues.apache.org/jira/browse/SPARK-36600
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>    Affects Versions: 3.1.2
>            Reporter: Philippe Prados
>            Priority: Trivial
>              Labels: easyfix
>         Attachments: optimize_memory_pyspark.patch
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> The Python method {{SparkSession._createFromLocal()}} start to the data, and create a list if it's not an instance of list. But it is necessary only if the scheme is not present.
> {quote}# make sure data could consumed multiple times
>  if not isinstance(data, list):
>   data = list(data)
> {quote}
> If you use {{createDataFrame(data=_a_generator_,...)}}, all the datas were save in memory in a list, then convert to a row in memory, then convert to buffer in pickle format, etc.
> Two lists were present at the same time in memory. The list created by _createFromLocal() and the list created later with
> {quote}# convert python objects to sql data
> data = [schema.toInternal(row) for row in data]
> {quote}
> The purpose of using a generator is to reduce the memory footprint when the data are dynamically build.
> {quote}def _createFromLocal(self, data, schema):
>   """
>   Create an RDD for DataFrame from a list or pandas.DataFrame, returns
>   the RDD and schema.
>   """
>   if schema is None or isinstance(schema, (list, tuple)):
>     *# make sure data could consumed multiple times*
>     *if inspect.isgeneratorfunction(data):* 
>       *data = list(data)*
>     struct = self._inferSchemaFromList(data, names=schema)
>     converter = _create_converter(struct)
>     data = map(converter, data)
>     if isinstance(schema, (list, tuple)):
>       for i, name in enumerate(schema):
>         struct.fields[i].name = name
>         struct.names[i] = name
>       schema = struct
>     elif not isinstance(schema, StructType):
>       raise TypeError("schema should be StructType or list or None, but got: %s" % schema)
>   # convert python objects to sql data
>   data = [schema.toInternal(row) for row in data]
>   return self._sc.parallelize(data), schema{quote}
> Then, it is interesting to use a generator.
>  
> {quote}The patch:
> diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
> index 57c680fd04..0dba590451 100644
> --- a/python/pyspark/sql/session.py
> +++ b/python/pyspark/sql/session.py
> @@ -15,6 +15,7 @@
>  # limitations under the License.
>  #
>  
> +import inspect
>  import sys
>  import warnings
>  from functools import reduce
> @@ -504,11 +505,11 @@ class SparkSession(SparkConversionMixin):
>  Create an RDD for DataFrame from a list or pandas.DataFrame, returns
>  the RDD and schema.
>  """
> - # make sure data could consumed multiple times
> - if not isinstance(data, list):
> - data = list(data)
>  
>  if schema is None or isinstance(schema, (list, tuple)):
> + # make sure data could consumed multiple times
> + if inspect.isgeneratorfunction(data): # PPR
> + data = list(data)
>  struct = self._inferSchemaFromList(data, names=schema)
>  converter = _create_converter(struct)
>  data = map(converter, data)
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org