You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Philippe Prados (Jira)" <ji...@apache.org> on 2021/08/26 11:28:00 UTC

[jira] [Created] (SPARK-36600) reduces memory consumption win Pyspark CreateDataFrame

Philippe Prados created SPARK-36600:
---------------------------------------

             Summary: reduces memory consumption win Pyspark CreateDataFrame
                 Key: SPARK-36600
                 URL: https://issues.apache.org/jira/browse/SPARK-36600
             Project: Spark
          Issue Type: Improvement
          Components: PySpark
    Affects Versions: 3.1.2
            Reporter: Philippe Prados


The Python method {{SparkSession._createFromLocal()}} start to the data, and create a list if it's not an instance of list. But it is necessary only if the scheme is not present.
{quote}# make sure data could consumed multiple times
 if not isinstance(data, list):
  data = list(data)
{quote}
If you use {{createDataFrame(data=_a_generator_,...)}}, all the datas were save in memory in a list, then convert to a row in memory, then convert to buffer in pickle format, etc.

Two lists were present at the same time in memory. The list created by _createFromLocal() and the list created later with
{quote}# convert python objects to sql data
data = [schema.toInternal(row) for row in data]
{quote}
The purpose of using a generator is to reduce the memory footprint when the data are dynamically build.
{quote}def _createFromLocal(self, data, schema):
  """
  Create an RDD for DataFrame from a list or pandas.DataFrame, returns
  the RDD and schema.
  """

  if schema is None or isinstance(schema, (list, tuple)):
    *# make sure data could consumed multiple times*
    *if inspect.isgeneratorfunction(data):* 
      *data = list(data)*
    struct = self._inferSchemaFromList(data, names=schema)
    converter = _create_converter(struct)
    data = map(converter, data)
    if isinstance(schema, (list, tuple)):
      for i, name in enumerate(schema):
        struct.fields[i].name = name
        struct.names[i] = name
      schema = struct

    elif not isinstance(schema, StructType):
      raise TypeError("schema should be StructType or list or None, but got: %s" % schema)

  # convert python objects to sql data
  data = [schema.toInternal(row) for row in data]
  return self._sc.parallelize(data), schema{quote}
Then, it is interesting to use a generator.

 
{quote}The patch:

diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 57c680fd04..0dba590451 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import inspect
 import sys
 import warnings
 from functools import reduce
@@ -504,11 +505,11 @@ class SparkSession(SparkConversionMixin):
 Create an RDD for DataFrame from a list or pandas.DataFrame, returns
 the RDD and schema.
 """
- # make sure data could consumed multiple times
- if not isinstance(data, list):
- data = list(data)
 
 if schema is None or isinstance(schema, (list, tuple)):
+ # make sure data could consumed multiple times
+ if inspect.isgeneratorfunction(data): # PPR
+ data = list(data)
 struct = self._inferSchemaFromList(data, names=schema)
 converter = _create_converter(struct)
 data = map(converter, data)
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org