You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by cu...@apache.org on 2020/06/25 18:12:46 UTC
[spark] branch branch-3.0 updated: [SPARK-32098][PYTHON] Use iloc
for positional slicing instead of direct slicing in createDataFrame with
Arrow
This is an automated email from the ASF dual-hosted git repository.
cutlerb pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2d6232a [SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow
2d6232a is described below
commit 2d6232aac476e8afe323fd56d461fadaf2550bb1
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Thu Jun 25 11:04:47 2020 -0700
[SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow
When you use floats are index of pandas, it creates a Spark DataFrame with a wrong results as below when Arrow is enabled:
```bash
./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
```
```python
>>> import pandas as pd
>>> spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])).show()
+---+
| a|
+---+
| 1|
| 1|
| 2|
+---+
```
This is because direct slicing uses the value as index when the index contains floats:
```python
>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])[2:]
a
2.0 1
3.0 2
4.0 3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.]).iloc[2:]
a
4.0 3
>>> pd.DataFrame({'a': [1,2,3]}, index=[2, 3, 4])[2:]
a
4 3
```
This PR proposes to explicitly use `iloc` to positionally slide when we create a DataFrame from a pandas DataFrame with Arrow enabled.
FWIW, I was trying to investigate why direct slicing refers the index value or the positional index sometimes but I stopped investigating further after reading this https://pandas.pydata.org/pandas-docs/stable/getting_started/10min.html#selection
> While standard Python / Numpy expressions for selecting and setting are intuitive and come in handy for interactive work, for production code, we recommend the optimized pandas data access methods, `.at`, `.iat`, `.loc` and `.iloc`.
To create the correct Spark DataFrame from a pandas DataFrame without a data loss.
Yes, it is a bug fix.
```bash
./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
```
```python
import pandas as pd
spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])).show()
```
Before:
```
+---+
| a|
+---+
| 1|
| 1|
| 2|
+---+
```
After:
```
+---+
| a|
+---+
| 1|
| 2|
| 3|
+---+
```
Manually tested and unittest were added.
Closes #28928 from HyukjinKwon/SPARK-32098.
Authored-by: HyukjinKwon <gu...@apache.org>
Signed-off-by: Bryan Cutler <cu...@gmail.com>
(cherry picked from commit 1af19a7b6836f87a3b34189a8a13b6d21d3a37d8)
Signed-off-by: Bryan Cutler <cu...@gmail.com>
---
python/pyspark/sql/pandas/conversion.py | 2 +-
python/pyspark/sql/tests/test_arrow.py | 6 ++++++
2 files changed, 7 insertions(+), 1 deletion(-)
diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py
index 251625a..e6d8e9f 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -413,7 +413,7 @@ class SparkConversionMixin(object):
# Slice the DataFrame to be batched
step = -(-len(pdf) // self.sparkContext.defaultParallelism) # round int up
- pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
+ pdf_slices = (pdf.iloc[start:start + step] for start in xrange(0, len(pdf), step))
# Create list of Arrow (columns, type) for serializer dump_stream
arrow_data = [[(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]
diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py
index 004c79f..1386f8d 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -415,6 +415,12 @@ class ArrowTests(ReusedSQLTestCase):
for case in cases:
run_test(*case)
+ def test_createDataFrame_with_float_index(self):
+ # SPARK-32098: float index should not produce duplicated or truncated Spark DataFrame
+ self.assertEqual(
+ self.spark.createDataFrame(
+ pd.DataFrame({'a': [1, 2, 3]}, index=[2., 3., 4.])).distinct().count(), 3)
+
@unittest.skipIf(
not have_pandas or not have_pyarrow,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org