You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Anton Kholodkov (Jira)" <ji...@apache.org> on 2021/09/21 11:39:00 UTC
[jira] [Commented] (SPARK-36600) reduces memory consumption win
Pyspark CreateDataFrame
[ https://issues.apache.org/jira/browse/SPARK-36600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17418052#comment-17418052 ]
Anton Kholodkov commented on SPARK-36600:
-----------------------------------------
Hello! I would like to help with the task. Please assign it to me, if possible
> reduces memory consumption win Pyspark CreateDataFrame
> ------------------------------------------------------
>
> Key: SPARK-36600
> URL: https://issues.apache.org/jira/browse/SPARK-36600
> Project: Spark
> Issue Type: Improvement
> Components: PySpark
> Affects Versions: 3.1.2
> Reporter: Philippe Prados
> Priority: Trivial
> Labels: easyfix
> Attachments: optimize_memory_pyspark.patch
>
> Original Estimate: 2h
> Remaining Estimate: 2h
>
> The Python method {{SparkSession._createFromLocal()}} start to the data, and create a list if it's not an instance of list. But it is necessary only if the scheme is not present.
> {quote}# make sure data could consumed multiple times
> if not isinstance(data, list):
> data = list(data)
> {quote}
> If you use {{createDataFrame(data=_a_generator_,...)}}, all the datas were save in memory in a list, then convert to a row in memory, then convert to buffer in pickle format, etc.
> Two lists were present at the same time in memory. The list created by _createFromLocal() and the list created later with
> {quote}# convert python objects to sql data
> data = [schema.toInternal(row) for row in data]
> {quote}
> The purpose of using a generator is to reduce the memory footprint when the data are dynamically build.
> {quote}def _createFromLocal(self, data, schema):
> """
> Create an RDD for DataFrame from a list or pandas.DataFrame, returns
> the RDD and schema.
> """
> if schema is None or isinstance(schema, (list, tuple)):
> *# make sure data could consumed multiple times*
> *if inspect.isgeneratorfunction(data):*
> *data = list(data)*
> struct = self._inferSchemaFromList(data, names=schema)
> converter = _create_converter(struct)
> data = map(converter, data)
> if isinstance(schema, (list, tuple)):
> for i, name in enumerate(schema):
> struct.fields[i].name = name
> struct.names[i] = name
> schema = struct
> elif not isinstance(schema, StructType):
> raise TypeError("schema should be StructType or list or None, but got: %s" % schema)
> # convert python objects to sql data
> data = [schema.toInternal(row) for row in data]
> return self._sc.parallelize(data), schema{quote}
> Then, it is interesting to use a generator.
>
> {quote}The patch:
> diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
> index 57c680fd04..0dba590451 100644
> --- a/python/pyspark/sql/session.py
> +++ b/python/pyspark/sql/session.py
> @@ -15,6 +15,7 @@
> # limitations under the License.
> #
>
> +import inspect
> import sys
> import warnings
> from functools import reduce
> @@ -504,11 +505,11 @@ class SparkSession(SparkConversionMixin):
> Create an RDD for DataFrame from a list or pandas.DataFrame, returns
> the RDD and schema.
> """
> - # make sure data could consumed multiple times
> - if not isinstance(data, list):
> - data = list(data)
>
> if schema is None or isinstance(schema, (list, tuple)):
> + # make sure data could consumed multiple times
> + if inspect.isgeneratorfunction(data): # PPR
> + data = list(data)
> struct = self._inferSchemaFromList(data, names=schema)
> converter = _create_converter(struct)
> data = map(converter, data)
> {quote}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org