You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/09/07 06:11:03 UTC

[GitHub] [spark] itholic commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

itholic commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r703144155



##########
File path: python/pyspark/pandas/series.py
##########
@@ -4475,6 +4477,146 @@ def replace(
 
         return self._with_new_scol(current)  # TODO: dtype?
 
+    def combine(
+        self,
+        other: "Series",
+        func: Callable,
+        fill_value: Optional[Any] = None,
+        return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+    ) -> "Series":
+        """
+        Combine the Series with a Series or scalar according to `func`.
+        Combine the Series and `other` using `func` to perform elementwise
+        selection for combined Series.
+        `fill_value` is assumed when value is missing at some index
+        from one of the two objects being combined.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        other : Series or scalar
+            The value(s) to be combined with the `Series`.
+        func : function
+            Function that takes two scalars as inputs and returns an element.
+        fill_value : scalar, optional
+            The value to assume when an index is missing from
+            one Series or the other. The default specifies to use the
+            appropriate NaN value for the underlying dtype of the Series.
+        return_type : :class:`pyspark.sql.types.DataType` or str
+            the return type of the output Series. The value can be either a
+            :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.

Review comment:
       I think pandas doesn't have such parameter `return_type` ??
   
   <img width="446" alt="Screen Shot 2021-09-07 at 12 04 53 PM" src="https://user-images.githubusercontent.com/44108233/132277894-c8973344-efa2-4365-a3e2-9bdf258f703f.png">
   
   
   We should avoid exposing the parameter that used internal purpose.
   
   Rather, we could use `infer_pd_series_spark_type` to infer the return type for `pandas_udf` as below:
   
   ```python
   >>> from pyspark.pandas.typedef import infer_pd_series_spark_type
   >>> infer_pd_series_spark_type(self, self.dtype)
   DoubleType
   ```

##########
File path: python/pyspark/pandas/series.py
##########
@@ -4475,6 +4477,146 @@ def replace(
 
         return self._with_new_scol(current)  # TODO: dtype?
 
+    def combine(
+        self,
+        other: "Series",
+        func: Callable,
+        fill_value: Optional[Any] = None,
+        return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+    ) -> "Series":
+        """
+        Combine the Series with a Series or scalar according to `func`.
+        Combine the Series and `other` using `func` to perform elementwise
+        selection for combined Series.
+        `fill_value` is assumed when value is missing at some index
+        from one of the two objects being combined.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        other : Series or scalar
+            The value(s) to be combined with the `Series`.
+        func : function
+            Function that takes two scalars as inputs and returns an element.
+        fill_value : scalar, optional
+            The value to assume when an index is missing from
+            one Series or the other. The default specifies to use the
+            appropriate NaN value for the underlying dtype of the Series.
+        return_type : :class:`pyspark.sql.types.DataType` or str
+            the return type of the output Series. The value can be either a
+            :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
+
+        Returns
+        -------
+        Series
+            The result of combining the Series with the other object.
+
+        See Also
+        --------
+        Series.combine_first : Combine Series values, choosing the calling
+            Series' values first.
+
+        Examples
+        --------
+        Consider 2 Datasets ``s1`` and ``s2`` containing
+        highest clocked speeds of different birds.
+
+        >>> from pyspark.pandas.config import set_option, reset_option
+        >>> set_option("compute.ops_on_diff_frames", True)
+        >>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+        >>> s1
+        falcon    330.0
+        eagle     160.0
+        dtype: float64
+        >>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+        >>> s2
+        falcon    345.0
+        eagle     200.0
+        duck       30.0
+        dtype: float64
+
+        Now, to combine the two datasets and view the highest speeds
+        of the birds across the two datasets
+
+        >>> s1.combine(s2, max, return_type="double")
+        duck        NaN
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+
+        In the previous example, the resulting value for duck is missing,
+        because the maximum of a NaN and a float is a NaN.
+        So, in the example, we set ``fill_value=0``,
+        so the maximum value returned will be the value from some dataset.
+
+        >>> s1.combine(s2, max, fill_value=0, return_type="double")
+        duck       30.0
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+        >>> reset_option("compute.ops_on_diff_frames")
+        """
+        if not isinstance(other, Series) and not np.isscalar(other):
+            raise TypeError("unsupported type: %s" % type(other))
+
+        tmp_other_col = verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+        if np.isscalar(other):
+            combined = DataFrame(
+                InternalFrame(
+                    spark_frame=self._internal.spark_frame.select(
+                        *self._internal.spark_columns
+                    ).withColumn(tmp_other_col, SF.lit(other)),
+                    index_spark_columns=self._internal.index_spark_columns,
+                )
+            )
+        elif same_anchor(self, other):
+            combined = self._psdf[self._column_label, other._column_label]
+        elif fill_value is None:
+            combined = combine_frames(self.to_frame(), other.to_frame())
+        else:
+            combined = self._combine_frame_with_fill_value(other, fill_value=fill_value)
+
+        @pandas_udf(returnType=return_type)  # type: ignore
+        def wrapped_func(x: pd.Series, y: pd.Series) -> pd.Series:
+            return x.combine(y, func)
+
+        internal = InternalFrame(
+            spark_frame=combined._internal.spark_frame.select(
+                *combined._internal.index_spark_columns,
+                wrapped_func(*combined._internal.data_spark_columns),
+                NATURAL_ORDER_COLUMN_NAME
+            ),
+            index_spark_columns=combined._internal.index_spark_columns,
+            column_labels=self._internal.column_labels if np.isscalar(other) else [None],
+            column_label_names=[None],
+        )
+        return first_series(DataFrame(internal))
+
+    def _combine_frame_with_fill_value(self, other: "Series", fill_value: Any) -> DataFrame:
+        return combine_frames(
+            self._fill_value_for_missing_index(other, fill_value=fill_value),
+            other._fill_value_for_missing_index(self, fill_value=fill_value),
+        )
+
+    def _fill_value_for_missing_index(self, other: "Series", fill_value: Any) -> DataFrame:
+        filled_self = self._internal.spark_frame.select(
+            *self._internal.index_spark_column_names, *self._internal.data_spark_column_names
+        ).union(
+            other.index.difference(self.index)._internal.spark_frame.select(
+                *self._internal.index_spark_column_names, F.lit(fill_value)
+            )
+        )
+        return ps.DataFrame(
+            InternalFrame(
+                spark_frame=filled_self,
+                index_spark_columns=[
+                    scol_for(filled_self, col) for col in self._internal.index_spark_column_names
+                ],
+                index_names=self._internal.index_names,
+            )
+        )

Review comment:
       We can also leverage the `with_new_sdf` here.
   
   ```python
   DataFrame(self._internal.with_new_sdf(filled_self))
   ```

##########
File path: python/pyspark/pandas/series.py
##########
@@ -4475,6 +4477,146 @@ def replace(
 
         return self._with_new_scol(current)  # TODO: dtype?
 
+    def combine(
+        self,
+        other: "Series",
+        func: Callable,
+        fill_value: Optional[Any] = None,
+        return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+    ) -> "Series":
+        """
+        Combine the Series with a Series or scalar according to `func`.
+        Combine the Series and `other` using `func` to perform elementwise
+        selection for combined Series.
+        `fill_value` is assumed when value is missing at some index
+        from one of the two objects being combined.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        other : Series or scalar
+            The value(s) to be combined with the `Series`.
+        func : function
+            Function that takes two scalars as inputs and returns an element.
+        fill_value : scalar, optional
+            The value to assume when an index is missing from
+            one Series or the other. The default specifies to use the
+            appropriate NaN value for the underlying dtype of the Series.
+        return_type : :class:`pyspark.sql.types.DataType` or str
+            the return type of the output Series. The value can be either a
+            :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
+
+        Returns
+        -------
+        Series
+            The result of combining the Series with the other object.
+
+        See Also
+        --------
+        Series.combine_first : Combine Series values, choosing the calling
+            Series' values first.
+
+        Examples
+        --------
+        Consider 2 Datasets ``s1`` and ``s2`` containing
+        highest clocked speeds of different birds.
+
+        >>> from pyspark.pandas.config import set_option, reset_option
+        >>> set_option("compute.ops_on_diff_frames", True)
+        >>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+        >>> s1
+        falcon    330.0
+        eagle     160.0
+        dtype: float64
+        >>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+        >>> s2
+        falcon    345.0
+        eagle     200.0
+        duck       30.0
+        dtype: float64
+
+        Now, to combine the two datasets and view the highest speeds
+        of the birds across the two datasets
+
+        >>> s1.combine(s2, max, return_type="double")
+        duck        NaN
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+
+        In the previous example, the resulting value for duck is missing,
+        because the maximum of a NaN and a float is a NaN.
+        So, in the example, we set ``fill_value=0``,
+        so the maximum value returned will be the value from some dataset.
+
+        >>> s1.combine(s2, max, fill_value=0, return_type="double")
+        duck       30.0
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+        >>> reset_option("compute.ops_on_diff_frames")
+        """
+        if not isinstance(other, Series) and not np.isscalar(other):
+            raise TypeError("unsupported type: %s" % type(other))
+
+        tmp_other_col = verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+        if np.isscalar(other):
+            combined = DataFrame(
+                InternalFrame(
+                    spark_frame=self._internal.spark_frame.select(
+                        *self._internal.spark_columns
+                    ).withColumn(tmp_other_col, SF.lit(other)),
+                    index_spark_columns=self._internal.index_spark_columns,
+                )
+            )
+        elif same_anchor(self, other):
+            combined = self._psdf[self._column_label, other._column_label]

Review comment:
       If `self` and `other` has same anchor, we don't need to create a combined DataFrame.
   
   How about
   
   ```python
           elif same_anchor(self, other):
               scol = wrapped_func(self.spark.column, other.spark.column)
               internal = self._internal.with_new_spark_column(self._column_label, scol)
               return first_series(DataFrame(internal))
   ```

##########
File path: python/pyspark/pandas/series.py
##########
@@ -4475,6 +4477,146 @@ def replace(
 
         return self._with_new_scol(current)  # TODO: dtype?
 
+    def combine(
+        self,
+        other: "Series",
+        func: Callable,
+        fill_value: Optional[Any] = None,
+        return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+    ) -> "Series":
+        """
+        Combine the Series with a Series or scalar according to `func`.
+        Combine the Series and `other` using `func` to perform elementwise
+        selection for combined Series.
+        `fill_value` is assumed when value is missing at some index
+        from one of the two objects being combined.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        other : Series or scalar
+            The value(s) to be combined with the `Series`.
+        func : function
+            Function that takes two scalars as inputs and returns an element.
+        fill_value : scalar, optional
+            The value to assume when an index is missing from
+            one Series or the other. The default specifies to use the
+            appropriate NaN value for the underlying dtype of the Series.
+        return_type : :class:`pyspark.sql.types.DataType` or str
+            the return type of the output Series. The value can be either a
+            :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
+
+        Returns
+        -------
+        Series
+            The result of combining the Series with the other object.
+
+        See Also
+        --------
+        Series.combine_first : Combine Series values, choosing the calling
+            Series' values first.
+
+        Examples
+        --------
+        Consider 2 Datasets ``s1`` and ``s2`` containing
+        highest clocked speeds of different birds.
+
+        >>> from pyspark.pandas.config import set_option, reset_option
+        >>> set_option("compute.ops_on_diff_frames", True)
+        >>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+        >>> s1
+        falcon    330.0
+        eagle     160.0
+        dtype: float64
+        >>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+        >>> s2
+        falcon    345.0
+        eagle     200.0
+        duck       30.0
+        dtype: float64
+
+        Now, to combine the two datasets and view the highest speeds
+        of the birds across the two datasets
+
+        >>> s1.combine(s2, max, return_type="double")
+        duck        NaN
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+
+        In the previous example, the resulting value for duck is missing,
+        because the maximum of a NaN and a float is a NaN.
+        So, in the example, we set ``fill_value=0``,
+        so the maximum value returned will be the value from some dataset.
+
+        >>> s1.combine(s2, max, fill_value=0, return_type="double")
+        duck       30.0
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+        >>> reset_option("compute.ops_on_diff_frames")
+        """
+        if not isinstance(other, Series) and not np.isscalar(other):
+            raise TypeError("unsupported type: %s" % type(other))
+
+        tmp_other_col = verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+        if np.isscalar(other):
+            combined = DataFrame(
+                InternalFrame(
+                    spark_frame=self._internal.spark_frame.select(
+                        *self._internal.spark_columns
+                    ).withColumn(tmp_other_col, SF.lit(other)),
+                    index_spark_columns=self._internal.index_spark_columns,
+                )
+            )
+        elif same_anchor(self, other):
+            combined = self._psdf[self._column_label, other._column_label]
+        elif fill_value is None:
+            combined = combine_frames(self.to_frame(), other.to_frame())
+        else:
+            combined = self._combine_frame_with_fill_value(other, fill_value=fill_value)
+
+        @pandas_udf(returnType=return_type)  # type: ignore
+        def wrapped_func(x: pd.Series, y: pd.Series) -> pd.Series:
+            return x.combine(y, func)
+
+        internal = InternalFrame(
+            spark_frame=combined._internal.spark_frame.select(
+                *combined._internal.index_spark_columns,
+                wrapped_func(*combined._internal.data_spark_columns),
+                NATURAL_ORDER_COLUMN_NAME
+            ),
+            index_spark_columns=combined._internal.index_spark_columns,
+            column_labels=self._internal.column_labels if np.isscalar(other) else [None],
+            column_label_names=[None],
+        )

Review comment:
       We can create the new `InternalFrame` by leveraging `self._internal.with_new_sdf` to keep the metadata from `self`.
   
   How about
   
   ```python
           scol = wrapped_func(*combined._internal.data_spark_columns)
           combined_sdf = combined._internal.spark_frame.select(
               *combined._internal.index_spark_columns,
               scol.alias(self._internal.spark_column_name_for(self.spark.column)),
               NATURAL_ORDER_COLUMN_NAME
           )
           internal = self._internal.with_new_sdf(combined_sdf)
   ```
   
   ?

##########
File path: python/pyspark/pandas/series.py
##########
@@ -4475,6 +4477,146 @@ def replace(
 
         return self._with_new_scol(current)  # TODO: dtype?
 
+    def combine(
+        self,
+        other: "Series",
+        func: Callable,
+        fill_value: Optional[Any] = None,
+        return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+    ) -> "Series":
+        """
+        Combine the Series with a Series or scalar according to `func`.
+        Combine the Series and `other` using `func` to perform elementwise
+        selection for combined Series.
+        `fill_value` is assumed when value is missing at some index
+        from one of the two objects being combined.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        other : Series or scalar
+            The value(s) to be combined with the `Series`.
+        func : function
+            Function that takes two scalars as inputs and returns an element.
+        fill_value : scalar, optional
+            The value to assume when an index is missing from
+            one Series or the other. The default specifies to use the
+            appropriate NaN value for the underlying dtype of the Series.
+        return_type : :class:`pyspark.sql.types.DataType` or str
+            the return type of the output Series. The value can be either a
+            :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.
+
+        Returns
+        -------
+        Series
+            The result of combining the Series with the other object.
+
+        See Also
+        --------
+        Series.combine_first : Combine Series values, choosing the calling
+            Series' values first.
+
+        Examples
+        --------
+        Consider 2 Datasets ``s1`` and ``s2`` containing
+        highest clocked speeds of different birds.
+
+        >>> from pyspark.pandas.config import set_option, reset_option
+        >>> set_option("compute.ops_on_diff_frames", True)
+        >>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+        >>> s1
+        falcon    330.0
+        eagle     160.0
+        dtype: float64
+        >>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+        >>> s2
+        falcon    345.0
+        eagle     200.0
+        duck       30.0
+        dtype: float64
+
+        Now, to combine the two datasets and view the highest speeds
+        of the birds across the two datasets
+
+        >>> s1.combine(s2, max, return_type="double")
+        duck        NaN
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+
+        In the previous example, the resulting value for duck is missing,
+        because the maximum of a NaN and a float is a NaN.
+        So, in the example, we set ``fill_value=0``,
+        so the maximum value returned will be the value from some dataset.
+
+        >>> s1.combine(s2, max, fill_value=0, return_type="double")
+        duck       30.0
+        eagle     200.0
+        falcon    345.0
+        dtype: float64
+        >>> reset_option("compute.ops_on_diff_frames")
+        """
+        if not isinstance(other, Series) and not np.isscalar(other):
+            raise TypeError("unsupported type: %s" % type(other))
+
+        tmp_other_col = verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+        if np.isscalar(other):
+            combined = DataFrame(
+                InternalFrame(
+                    spark_frame=self._internal.spark_frame.select(
+                        *self._internal.spark_columns
+                    ).withColumn(tmp_other_col, SF.lit(other)),
+                    index_spark_columns=self._internal.index_spark_columns,
+                )
+            )
+        elif same_anchor(self, other):
+            combined = self._psdf[self._column_label, other._column_label]
+        elif fill_value is None:
+            combined = combine_frames(self.to_frame(), other.to_frame())
+        else:
+            combined = self._combine_frame_with_fill_value(other, fill_value=fill_value)
+
+        @pandas_udf(returnType=return_type)  # type: ignore
+        def wrapped_func(x: pd.Series, y: pd.Series) -> pd.Series:
+            return x.combine(y, func)
+
+        internal = InternalFrame(
+            spark_frame=combined._internal.spark_frame.select(
+                *combined._internal.index_spark_columns,
+                wrapped_func(*combined._internal.data_spark_columns),
+                NATURAL_ORDER_COLUMN_NAME
+            ),
+            index_spark_columns=combined._internal.index_spark_columns,
+            column_labels=self._internal.column_labels if np.isscalar(other) else [None],
+            column_label_names=[None],

Review comment:
       We should keep the name when the `other` has same name, or `other` is scalar.
   
   1. when the `other` has same name.
   ```python
   >>> self
   falcon    330.0
   eagle     160.0
   Name: pandas-on-Spark, dtype: float64
   >>> other
   falcon    345.0
   eagle     200.0
   duck       30.0
   Name: pandas-on-Spark, dtype: float64
   
   >>> self.to_pandas().combine(other.to_pandas(), max)
   duck        NaN
   eagle     200.0
   falcon    345.0
   Name: pandas-on-Spark, dtype: float64  # The name is kept
   ```
   
   2. when the `other` is scalar.
   ```python
   >>> self.to_pandas().combine(3, max)
   falcon    330.0
   eagle     160.0
   Name: pandas-on-Spark, dtype: float64
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org