You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2017/09/17 17:34:49 UTC
spark git commit: [SPARK-22032][PYSPARK] Speed up StructType
conversion
Repository: spark
Updated Branches:
refs/heads/master 73d906722 -> f4073020a
[SPARK-22032][PYSPARK] Speed up StructType conversion
## What changes were proposed in this pull request?
StructType.fromInternal is calling f.fromInternal(v) for every field.
We can use precalculated information about type to limit the number of function calls. (its calculated once per StructType and used in per record calculations)
Benchmarks (Python profiler)
```
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
```
Before
```
310274584 function calls (300272456 primitive calls) in 1320.684 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
10000000 253.417 0.000 486.991 0.000 types.py:619(<listcomp>)
30000000 192.272 0.000 1009.986 0.000 types.py:612(fromInternal)
100000000 176.140 0.000 176.140 0.000 types.py:88(fromInternal)
20000000 156.832 0.000 328.093 0.000 types.py:1471(_create_row)
14000 107.206 0.008 1237.917 0.088 {built-in method loads}
20000000 80.176 0.000 1090.162 0.000 types.py:1468(<lambda>)
```
After
```
210274584 function calls (200272456 primitive calls) in 1035.974 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
30000000 215.845 0.000 698.748 0.000 types.py:612(fromInternal)
20000000 165.042 0.000 351.572 0.000 types.py:1471(_create_row)
14000 116.834 0.008 946.791 0.068 {built-in method loads}
20000000 87.326 0.000 786.073 0.000 types.py:1468(<lambda>)
20000000 85.477 0.000 134.607 0.000 types.py:1519(__new__)
10000000 65.777 0.000 126.712 0.000 types.py:619(<listcomp>)
```
Main difference is types.py:619(<listcomp>) and types.py:88(fromInternal) (which is removed in After)
The number of function calls is 100 million less. And performance is 20% better.
Benchmark (worst case scenario.)
Test
```
df = spark.range(1000000).selectExpr("current_timestamp as id0", "current_timestamp as id1", "current_timestamp as id2", "current_timestamp as id3", "current_timestamp as id4", "current_timestamp as id5", "current_timestamp as id6", "current_timestamp as id7", "current_timestamp as id8", "current_timestamp as id9").cache()
df.count()
df.rdd.map(lambda x: x).count()
```
Before
```
31166064 function calls (31163984 primitive calls) in 150.882 seconds
```
After
```
31166064 function calls (31163984 primitive calls) in 153.220 seconds
```
IMPORTANT:
The benchmark was done on top of https://github.com/apache/spark/pull/19246.
Without https://github.com/apache/spark/pull/19246 the performance improvement will be even greater.
## How was this patch tested?
Existing tests.
Performance benchmark.
Author: Maciej Bryński <ma...@brynski.pl>
Closes #19249 from maver1ck/spark_22032.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4073020
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4073020
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4073020
Branch: refs/heads/master
Commit: f4073020adf9752c7d7b39631ec3fa36d6345902
Parents: 73d9067
Author: Maciej Bryński <ma...@brynski.pl>
Authored: Mon Sep 18 02:34:44 2017 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Mon Sep 18 02:34:44 2017 +0900
----------------------------------------------------------------------
python/pyspark/sql/types.py | 22 ++++++++++++++++------
1 file changed, 16 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f4073020/python/pyspark/sql/types.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 920cf00..aaf520f 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -483,7 +483,9 @@ class StructType(DataType):
self.names = [f.name for f in fields]
assert all(isinstance(f, StructField) for f in fields),\
"fields should be a list of StructField"
- self._needSerializeAnyField = any(f.needConversion() for f in self)
+ # Precalculated list of fields that need conversion with fromInternal/toInternal functions
+ self._needConversion = [f.needConversion() for f in self]
+ self._needSerializeAnyField = any(self._needConversion)
def add(self, field, data_type=None, nullable=True, metadata=None):
"""
@@ -528,7 +530,9 @@ class StructType(DataType):
data_type_f = data_type
self.fields.append(StructField(field, data_type_f, nullable, metadata))
self.names.append(field)
- self._needSerializeAnyField = any(f.needConversion() for f in self)
+ # Precalculated list of fields that need conversion with fromInternal/toInternal functions
+ self._needConversion = [f.needConversion() for f in self]
+ self._needSerializeAnyField = any(self._needConversion)
return self
def __iter__(self):
@@ -590,13 +594,17 @@ class StructType(DataType):
return
if self._needSerializeAnyField:
+ # Only calling toInternal function for fields that need conversion
if isinstance(obj, dict):
- return tuple(f.toInternal(obj.get(n)) for n, f in zip(self.names, self.fields))
+ return tuple(f.toInternal(obj.get(n)) if c else obj.get(n)
+ for n, f, c in zip(self.names, self.fields, self._needConversion))
elif isinstance(obj, (tuple, list)):
- return tuple(f.toInternal(v) for f, v in zip(self.fields, obj))
+ return tuple(f.toInternal(v) if c else v
+ for f, v, c in zip(self.fields, obj, self._needConversion))
elif hasattr(obj, "__dict__"):
d = obj.__dict__
- return tuple(f.toInternal(d.get(n)) for n, f in zip(self.names, self.fields))
+ return tuple(f.toInternal(d.get(n)) if c else d.get(n)
+ for n, f, c in zip(self.names, self.fields, self._needConversion))
else:
raise ValueError("Unexpected tuple %r with StructType" % obj)
else:
@@ -619,7 +627,9 @@ class StructType(DataType):
# it's already converted by pickler
return obj
if self._needSerializeAnyField:
- values = [f.fromInternal(v) for f, v in zip(self.fields, obj)]
+ # Only calling fromInternal function for fields that need conversion
+ values = [f.fromInternal(v) if c else v
+ for f, v, c in zip(self.fields, obj, self._needConversion)]
else:
values = obj
return _create_row(self.names, values)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org