You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by cu...@apache.org on 2020/06/25 18:19:00 UTC

[spark] branch branch-2.4 updated: [SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow

This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new a295003  [SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow
a295003 is described below

commit a295003d48df719fb92c1ee3547dbfd7df85424b
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Thu Jun 25 11:04:47 2020 -0700

    [SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow
    
    When you use floats are index of pandas, it creates a Spark DataFrame with a wrong results as below when Arrow is enabled:
    
    ```bash
    ./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
    ```
    
    ```python
    >>> import pandas as pd
    >>> spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])).show()
    +---+
    |  a|
    +---+
    |  1|
    |  1|
    |  2|
    +---+
    ```
    
    This is because direct slicing uses the value as index when the index contains floats:
    
    ```python
    >>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])[2:]
         a
    2.0  1
    3.0  2
    4.0  3
    >>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.]).iloc[2:]
         a
    4.0  3
    >>> pd.DataFrame({'a': [1,2,3]}, index=[2, 3, 4])[2:]
       a
    4  3
    ```
    
    This PR proposes to explicitly use `iloc` to positionally slide when we create a DataFrame from a pandas DataFrame with Arrow enabled.
    
    FWIW, I was trying to investigate why direct slicing refers the index value or the positional index sometimes but I stopped investigating further after reading this https://pandas.pydata.org/pandas-docs/stable/getting_started/10min.html#selection
    
    > While standard Python / Numpy expressions for selecting and setting are intuitive and come in handy for interactive work, for production code, we recommend the optimized pandas data access methods, `.at`, `.iat`, `.loc` and `.iloc`.
    
    To create the correct Spark DataFrame from a pandas DataFrame without a data loss.
    
    Yes, it is a bug fix.
    
    ```bash
    ./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true
    ```
    ```python
    import pandas as pd
    spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])).show()
    ```
    
    Before:
    
    ```
    +---+
    |  a|
    +---+
    |  1|
    |  1|
    |  2|
    +---+
    ```
    
    After:
    
    ```
    +---+
    |  a|
    +---+
    |  1|
    |  2|
    |  3|
    +---+
    ```
    
    Manually tested and unittest were added.
    
    Closes #28928 from HyukjinKwon/SPARK-32098.
    
    Authored-by: HyukjinKwon <gu...@apache.org>
    Signed-off-by: Bryan Cutler <cu...@gmail.com>
    (cherry picked from commit 1af19a7b6836f87a3b34189a8a13b6d21d3a37d8)
    Signed-off-by: Bryan Cutler <cu...@gmail.com>
---
 python/pyspark/sql/session.py | 2 +-
 python/pyspark/sql/tests.py   | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index fc18f57..bfb0f1c 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -524,7 +524,7 @@ class SparkSession(object):
 
         # Slice the DataFrame to be batched
         step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # round int up
-        pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
+        pdf_slices = (pdf.iloc[start:start + step] for start in xrange(0, len(pdf), step))
 
         # Create Arrow record batches
         batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)],
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1bbf08d..49acf04 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -4572,6 +4572,13 @@ class ArrowTests(ReusedSQLTestCase):
         self.assertPandasEqual(pdf, df_from_python.toPandas())
         self.assertPandasEqual(pdf, df_from_pandas.toPandas())
 
+    def test_createDataFrame_with_float_index(self):
+        import pandas as pd
+        # SPARK-32098: float index should not produce duplicated or truncated Spark DataFrame
+        self.assertEqual(
+            self.spark.createDataFrame(
+                pd.DataFrame({'a': [1, 2, 3]}, index=[2., 3., 4.])).distinct().count(), 3)
+
 
 @unittest.skipIf(
     not _have_pandas or not _have_pyarrow,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org