You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Philippe Prados (Jira)" <ji...@apache.org> on 2021/08/26 11:28:00 UTC
[jira] [Created] (SPARK-36600) reduces memory consumption win
Pyspark CreateDataFrame
Philippe Prados created SPARK-36600:
---------------------------------------
Summary: reduces memory consumption win Pyspark CreateDataFrame
Key: SPARK-36600
URL: https://issues.apache.org/jira/browse/SPARK-36600
Project: Spark
Issue Type: Improvement
Components: PySpark
Affects Versions: 3.1.2
Reporter: Philippe Prados
The Python method {{SparkSession._createFromLocal()}} start to the data, and create a list if it's not an instance of list. But it is necessary only if the scheme is not present.
{quote}# make sure data could consumed multiple times
if not isinstance(data, list):
data = list(data)
{quote}
If you use {{createDataFrame(data=_a_generator_,...)}}, all the datas were save in memory in a list, then convert to a row in memory, then convert to buffer in pickle format, etc.
Two lists were present at the same time in memory. The list created by _createFromLocal() and the list created later with
{quote}# convert python objects to sql data
data = [schema.toInternal(row) for row in data]
{quote}
The purpose of using a generator is to reduce the memory footprint when the data are dynamically build.
{quote}def _createFromLocal(self, data, schema):
"""
Create an RDD for DataFrame from a list or pandas.DataFrame, returns
the RDD and schema.
"""
if schema is None or isinstance(schema, (list, tuple)):
*# make sure data could consumed multiple times*
*if inspect.isgeneratorfunction(data):*
*data = list(data)*
struct = self._inferSchemaFromList(data, names=schema)
converter = _create_converter(struct)
data = map(converter, data)
if isinstance(schema, (list, tuple)):
for i, name in enumerate(schema):
struct.fields[i].name = name
struct.names[i] = name
schema = struct
elif not isinstance(schema, StructType):
raise TypeError("schema should be StructType or list or None, but got: %s" % schema)
# convert python objects to sql data
data = [schema.toInternal(row) for row in data]
return self._sc.parallelize(data), schema{quote}
Then, it is interesting to use a generator.
{quote}The patch:
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 57c680fd04..0dba590451 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import inspect
import sys
import warnings
from functools import reduce
@@ -504,11 +505,11 @@ class SparkSession(SparkConversionMixin):
Create an RDD for DataFrame from a list or pandas.DataFrame, returns
the RDD and schema.
"""
- # make sure data could consumed multiple times
- if not isinstance(data, list):
- data = list(data)
if schema is None or isinstance(schema, (list, tuple)):
+ # make sure data could consumed multiple times
+ if inspect.isgeneratorfunction(data): # PPR
+ data = list(data)
struct = self._inferSchemaFromList(data, names=schema)
converter = _create_converter(struct)
data = map(converter, data)
{quote}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org