You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/26 19:30:28 UTC

[GitHub] [spark] ueshin commented on a diff in pull request #37564: [SPARK-40135][PS] Support `data` mixed with `index` in DataFrame creation

ueshin commented on code in PR #37564:
URL: https://github.com/apache/spark/pull/37564#discussion_r956361716


##########
python/pyspark/pandas/frame.py:
##########
@@ -411,56 +420,154 @@ class DataFrame(Frame, Generic[T]):
 
     Constructing DataFrame from numpy ndarray:
 
-    >>> df2 = ps.DataFrame(np.random.randint(low=0, high=10, size=(5, 5)),
-    ...                    columns=['a', 'b', 'c', 'd', 'e'])
-    >>> df2  # doctest: +SKIP
+    >>> import numpy as np
+    >>> ps.DataFrame(data=np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 0]]),
+    ...     columns=['a', 'b', 'c', 'd', 'e'])
+       a  b  c  d  e
+    0  1  2  3  4  5
+    1  6  7  8  9  0
+
+    Constructing DataFrame from numpy ndarray with Pandas index:
+
+    >>> import numpy as np
+    >>> import pandas as pd
+
+    >>> ps.DataFrame(data=np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 0]]),
+    ...     index=pd.Index([1, 4]), columns=['a', 'b', 'c', 'd', 'e'])
        a  b  c  d  e
-    0  3  1  4  9  8
-    1  4  8  4  8  4
-    2  7  6  5  6  7
-    3  8  7  9  1  0
-    4  2  5  4  3  9
+    1  1  2  3  4  5
+    4  6  7  8  9  0
+
+    Constructing DataFrame from numpy ndarray with pandas-on-Spark index:
+
+    >>> import numpy as np
+    >>> import pandas as pd
+    >>> ps.DataFrame(data=np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 0]]),
+    ...     index=ps.Index([1, 4]), columns=['a', 'b', 'c', 'd', 'e'])
+       a  b  c  d  e
+    1  1  2  3  4  5
+    4  6  7  8  9  0
+
+    Constructing DataFrame from Pandas DataFrame with Pandas index:
+
+    >>> import numpy as np
+    >>> import pandas as pd
+    >>> pdf = pd.DataFrame(data=np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 0]]),
+    ...     columns=['a', 'b', 'c', 'd', 'e'])
+    >>> ps.DataFrame(data=pdf, index=pd.Index([1, 4]))
+         a    b    c    d    e
+    1  6.0  7.0  8.0  9.0  0.0
+    4  NaN  NaN  NaN  NaN  NaN
+
+    Constructing DataFrame from Pandas DataFrame with pandas-on-Spark index:
+
+    >>> import numpy as np
+    >>> import pandas as pd
+    >>> pdf = pd.DataFrame(data=np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 0]]),
+    ...     columns=['a', 'b', 'c', 'd', 'e'])
+    >>> ps.DataFrame(data=pdf, index=ps.Index([1, 4]))
+         a    b    c    d    e
+    1  6.0  7.0  8.0  9.0  0.0
+    4  NaN  NaN  NaN  NaN  NaN
+
+    Constructing DataFrame from Spark DataFrame with Pandas index:
+
+    >>> import pandas as pd
+    >>> sdf = spark.createDataFrame([("Data", 1), ("Bricks", 2)], ["x", "y"])
+    >>> ps.DataFrame(data=sdf, index=pd.Index([0, 1, 2]))
+    Traceback (most recent call last):
+      ...
+    ValueError: Cannot combine the series or dataframe...'compute.ops_on_diff_frames' option.
+
+    Need to enable 'compute.ops_on_diff_frames' to combine SparkDataFrame and Pandas index
+
+    >>> with ps.option_context("compute.ops_on_diff_frames", True):
+    ...     ps.DataFrame(data=sdf, index=pd.Index([0, 1, 2]))
+            x    y
+    0    Data  1.0
+    1  Bricks  2.0
+    2    None  NaN
+
+    Constructing DataFrame from Spark DataFrame with pandas-on-Spark index:
+
+    >>> import pandas as pd
+    >>> sdf = spark.createDataFrame([("Data", 1), ("Bricks", 2)], ["x", "y"])
+    >>> ps.DataFrame(data=sdf, index=ps.Index([0, 1, 2]))
+    Traceback (most recent call last):
+      ...
+    ValueError: Cannot combine the series or dataframe...'compute.ops_on_diff_frames' option.
+
+    Need to enable 'compute.ops_on_diff_frames' to combine SparkDataFrame and Pandas index
+
+    >>> with ps.option_context("compute.ops_on_diff_frames", True):
+    ...     ps.DataFrame(data=sdf, index=ps.Index([0, 1, 2]))
+            x    y
+    0    Data  1.0
+    1  Bricks  2.0
+    2    None  NaN
     """
 
     def __init__(  # type: ignore[no-untyped-def]
         self, data=None, index=None, columns=None, dtype=None, copy=False
     ):
+        index_assigned = False
         if isinstance(data, InternalFrame):
-            assert index is None
             assert columns is None
             assert dtype is None
             assert not copy
-            internal = data
+            if index is None:
+                internal = data
         elif isinstance(data, SparkDataFrame):
-            assert index is None
             assert columns is None
             assert dtype is None
             assert not copy
-            internal = InternalFrame(spark_frame=data, index_spark_columns=None)
+            if index is None:
+                internal = InternalFrame(spark_frame=data, index_spark_columns=None)
+        elif isinstance(data, ps.DataFrame):
+            assert columns is None
+            assert dtype is None
+            assert not copy
+            if index is None:
+                internal = data._internal.resolved_copy
         elif isinstance(data, ps.Series):
-            assert index is None
             assert columns is None
             assert dtype is None
             assert not copy
-            data = data.to_frame()
-            internal = data._internal
+            if index is None:
+                internal = data.to_frame()._internal.resolved_copy
         else:
-            if isinstance(data, pd.DataFrame):
-                assert index is None
-                assert columns is None
-                assert dtype is None
-                assert not copy
-                pdf = data
-            else:
-                from pyspark.pandas.indexes.base import Index
+            from pyspark.pandas.indexes.base import Index
 
-                if isinstance(index, Index):
-                    raise TypeError(
-                        "The given index cannot be a pandas-on-Spark index. "
-                        "Try pandas index or array-like."
-                    )
-                pdf = pd.DataFrame(data=data, index=index, columns=columns, dtype=dtype, copy=copy)
+            if index is not None and isinstance(index, Index):
+                # with local data, collect ps.Index to driver
+                # to avoid mismatched results between
+                # ps.DataFrame([1, 2], index=ps.Index([1, 2]))
+                # and
+                # pd.DataFrame([1, 2], index=pd.Index([1, 2]))
+                index = index.to_pandas()
+
+            pdf = pd.DataFrame(data=data, index=index, columns=columns, dtype=dtype, copy=copy)
             internal = InternalFrame.from_pandas(pdf)
+            index_assigned = True
+
+        if index is not None and not index_assigned:
+            data_df = ps.DataFrame(data=data, index=None, columns=columns, dtype=dtype, copy=copy)
+            index_ps = ps.Index(index)
+            index_df = index_ps.to_frame()
+
+            # drop un-matched rows in `data`
+            # note that `combine_frames` can not work with a MultiIndex for now

Review Comment:
   Could you also update the comment here as @itholic mentions?
   Also could you raise an error with an appropriate error message for the case?



##########
python/pyspark/pandas/frame.py:
##########
@@ -375,6 +373,16 @@ class DataFrame(Frame, Generic[T]):
     copy : boolean, default False
         Copy data from inputs. Only affects DataFrame / 2d ndarray input
 
+    .. versionchanged:: 3.4.0
+    Since 3.4.0, it deals with `data` and `index` in this approach:
+    1, when `data` is a distributed dataset (Internal DataFrame/Spark DataFrame/
+    pandas-on-Spark DataFrame/pandas-on-Spark Series), it will first parallize
+    the `index` if necessary, and then try to combine the `data` and `index`;
+    Note that in this case `compute.ops_on_diff_frames` should be turned on;
+    2, when `data` is a local dataset (Pandas DataFrame/numpy ndarray/list/etc),
+    it will first collect the `index` to driver if necessary, and then apply
+    the `Pandas.DataFrame(...)` creation internally;

Review Comment:
   I guess we need indent for the comment of `versionchanged`?
   
   ```py
   .. versionchanged:: 3.4.0
      Since 3.4.0, ...
   ```



##########
python/pyspark/pandas/frame.py:
##########
@@ -411,56 +419,154 @@ class DataFrame(Frame, Generic[T]):
 
     Constructing DataFrame from numpy ndarray:
 
-    >>> df2 = ps.DataFrame(np.random.randint(low=0, high=10, size=(5, 5)),
-    ...                    columns=['a', 'b', 'c', 'd', 'e'])
-    >>> df2  # doctest: +SKIP
+    >>> import numpy as np
+    >>> ps.DataFrame(data=np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 0]]),
+    ...     columns=['a', 'b', 'c', 'd', 'e'])
+       a  b  c  d  e
+    0  1  2  3  4  5
+    1  6  7  8  9  0
+
+    Constructing DataFrame from numpy ndarray with Pandas index:
+
+    >>> import numpy as np
+    >>> import pandas as pd
+
+    >>> ps.DataFrame(data=np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 0]]),
+    ...     index=pd.Index([1, 4]), columns=['a', 'b', 'c', 'd', 'e'])
        a  b  c  d  e
-    0  3  1  4  9  8
-    1  4  8  4  8  4
-    2  7  6  5  6  7
-    3  8  7  9  1  0
-    4  2  5  4  3  9
+    1  1  2  3  4  5
+    4  6  7  8  9  0
+
+    Constructing DataFrame from numpy ndarray with pandas-on-Spark index:
+
+    >>> import numpy as np
+    >>> import pandas as pd
+    >>> ps.DataFrame(data=np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 0]]),
+    ...     index=ps.Index([1, 4]), columns=['a', 'b', 'c', 'd', 'e'])
+       a  b  c  d  e
+    1  1  2  3  4  5
+    4  6  7  8  9  0
+
+    Constructing DataFrame from Pandas DataFrame with Pandas index:
+
+    >>> import numpy as np
+    >>> import pandas as pd
+    >>> pdf = pd.DataFrame(data=np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 0]]),
+    ...     columns=['a', 'b', 'c', 'd', 'e'])
+    >>> ps.DataFrame(data=pdf, index=pd.Index([1, 4]))
+         a    b    c    d    e
+    1  6.0  7.0  8.0  9.0  0.0
+    4  NaN  NaN  NaN  NaN  NaN
+
+    Constructing DataFrame from Pandas DataFrame with pandas-on-Spark index:
+
+    >>> import numpy as np
+    >>> import pandas as pd
+    >>> pdf = pd.DataFrame(data=np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 0]]),
+    ...     columns=['a', 'b', 'c', 'd', 'e'])
+    >>> ps.DataFrame(data=pdf, index=ps.Index([1, 4]))
+         a    b    c    d    e
+    1  6.0  7.0  8.0  9.0  0.0
+    4  NaN  NaN  NaN  NaN  NaN
+
+    Constructing DataFrame from Spark DataFrame with Pandas index:
+
+    >>> import pandas as pd
+    >>> sdf = spark.createDataFrame([("Data", 1), ("Bricks", 2)], ["x", "y"])
+    >>> ps.DataFrame(data=sdf, index=pd.Index([0, 1, 2]))
+    Traceback (most recent call last):
+      ...
+    ValueError: Cannot combine the series or dataframe...'compute.ops_on_diff_frames' option.
+
+    Need to enable 'compute.ops_on_diff_frames' to combine SparkDataFrame and Pandas index
+
+    >>> with ps.option_context("compute.ops_on_diff_frames", True):
+    ...     ps.DataFrame(data=sdf, index=pd.Index([0, 1, 2]))
+            x    y
+    0    Data  1.0
+    1  Bricks  2.0
+    2    None  NaN
+
+    Constructing DataFrame from Spark DataFrame with pandas-on-Spark index:
+
+    >>> import pandas as pd
+    >>> sdf = spark.createDataFrame([("Data", 1), ("Bricks", 2)], ["x", "y"])
+    >>> ps.DataFrame(data=sdf, index=ps.Index([0, 1, 2]))
+    Traceback (most recent call last):
+      ...
+    ValueError: Cannot combine the series or dataframe...'compute.ops_on_diff_frames' option.
+
+    Need to enable 'compute.ops_on_diff_frames' to combine SparkDataFrame and Pandas index
+
+    >>> with ps.option_context("compute.ops_on_diff_frames", True):
+    ...     ps.DataFrame(data=sdf, index=ps.Index([0, 1, 2]))
+            x    y
+    0    Data  1.0
+    1  Bricks  2.0
+    2    None  NaN
     """
 
     def __init__(  # type: ignore[no-untyped-def]
         self, data=None, index=None, columns=None, dtype=None, copy=False
     ):
+        index_assigned = False
         if isinstance(data, InternalFrame):
-            assert index is None
             assert columns is None
             assert dtype is None
             assert not copy
-            internal = data
+            if index is None:
+                internal = data
         elif isinstance(data, SparkDataFrame):
-            assert index is None
             assert columns is None
             assert dtype is None
             assert not copy
-            internal = InternalFrame(spark_frame=data, index_spark_columns=None)
+            if index is None:
+                internal = InternalFrame(spark_frame=data, index_spark_columns=None)
+        elif isinstance(data, ps.DataFrame):
+            assert columns is None
+            assert dtype is None
+            assert not copy
+            if index is None:
+                internal = data._internal.resolved_copy
         elif isinstance(data, ps.Series):
-            assert index is None
             assert columns is None
             assert dtype is None
             assert not copy
-            data = data.to_frame()
-            internal = data._internal
+            if index is None:
+                internal = data.to_frame()._internal.resolved_copy
         else:
-            if isinstance(data, pd.DataFrame):
-                assert index is None
-                assert columns is None
-                assert dtype is None
-                assert not copy
-                pdf = data
-            else:
-                from pyspark.pandas.indexes.base import Index
+            from pyspark.pandas.indexes.base import Index
 
-                if isinstance(index, Index):
-                    raise TypeError(
-                        "The given index cannot be a pandas-on-Spark index. "
-                        "Try pandas index or array-like."
-                    )
-                pdf = pd.DataFrame(data=data, index=index, columns=columns, dtype=dtype, copy=copy)
+            if index is not None and isinstance(index, Index):
+                # with local data, collect ps.Index to driver
+                # to avoid mismatched results between
+                # ps.DataFrame([1, 2], index=ps.Index([1, 2]))
+                # and
+                # pd.DataFrame([1, 2], index=pd.Index([1, 2]))
+                index = index.to_pandas()

Review Comment:
   This should be `_to_pandas()` to avoid warnings?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org