You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/08/23 01:09:17 UTC

[spark] branch master updated: [SPARK-36470][PYTHON] Implement `CategoricalIndex.map` and `DatetimeIndex.map`

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b6af46  [SPARK-36470][PYTHON] Implement `CategoricalIndex.map` and `DatetimeIndex.map`
0b6af46 is described below

commit 0b6af464dc669b72948d8848375ed414e2d7f1c8
Author: Xinrong Meng <xi...@databricks.com>
AuthorDate: Mon Aug 23 10:08:40 2021 +0900

    [SPARK-36470][PYTHON] Implement `CategoricalIndex.map` and `DatetimeIndex.map`
    
    ### What changes were proposed in this pull request?
    Implement `CategoricalIndex.map` and `DatetimeIndex.map`
    
    `MultiIndex.map` cannot be implemented in the same way as the `map` of other indexes. It should be taken care of separately if necessary.
    
    ### Why are the changes needed?
    Mapping values using input correspondence is a common operation that is supported in pandas. We shall support that as well.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. `CategoricalIndex.map` and `DatetimeIndex.map` can be used now.
    
    - CategoricalIndex.map
    
    ```py
    >>> idx = ps.CategoricalIndex(['a', 'b', 'c'])
    >>> idx
    CategoricalIndex(['a', 'b', 'c'], categories=['a', 'b', 'c'], ordered=False, dtype='category')
    
    >>> idx.map(lambda x: x.upper())
    CategoricalIndex(['A', 'B', 'C'],  categories=['A', 'B', 'C'], ordered=False, dtype='category')
    
    >>> pser = pd.Series([1, 2, 3], index=pd.CategoricalIndex(['a', 'b', 'c'], ordered=True))
    >>> idx.map(pser)
    CategoricalIndex([1, 2, 3], categories=[1, 2, 3], ordered=True, dtype='category')
    
    >>> idx.map({'a': 'first', 'b': 'second', 'c': 'third'})
    CategoricalIndex(['first', 'second', 'third'], categories=['first', 'second', 'third'], ordered=False, dtype='category')
    ```
    
    - DatetimeIndex.map
    
    ```py
    >>> pidx = pd.date_range(start="2020-08-08", end="2020-08-10")
    >>> psidx = ps.from_pandas(pidx)
    
    >>> mapper_dict = {
    ...   datetime.datetime(2020, 8, 8): datetime.datetime(2021, 8, 8),
    ...   datetime.datetime(2020, 8, 9): datetime.datetime(2021, 8, 9),
    ... }
    >>> psidx.map(mapper_dict)
    DatetimeIndex(['2021-08-08', '2021-08-09', 'NaT'], dtype='datetime64[ns]', freq=None)
    
    >>> mapper_pser = pd.Series([1, 2, 3], index=pidx)
    >>> psidx.map(mapper_pser)
    Int64Index([1, 2, 3], dtype='int64')
    >>> psidx
    DatetimeIndex(['2020-08-08', '2020-08-09', '2020-08-10'], dtype='datetime64[ns]', freq=None)
    
    >>> psidx.map(lambda x: x.strftime("%B %d, %Y, %r"))
    Index(['August 08, 2020, 12:00:00 AM', 'August 09, 2020, 12:00:00 AM',
           'August 10, 2020, 12:00:00 AM'],
          dtype='object')
    ```
    
    ### How was this patch tested?
    Unit tests.
    
    Closes #33756 from xinrong-databricks/other_indexes_map.
    
    Authored-by: Xinrong Meng <xi...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../source/reference/pyspark.pandas/indexing.rst   |  1 +
 python/pyspark/pandas/indexes/category.py          | 88 ++++++++++++++++++----
 python/pyspark/pandas/indexes/datetimes.py         |  9 +--
 python/pyspark/pandas/missing/indexes.py           |  7 --
 python/pyspark/pandas/tests/indexes/test_base.py   | 40 ----------
 .../pyspark/pandas/tests/indexes/test_category.py  | 68 +++++++++++++++++
 .../pyspark/pandas/tests/indexes/test_datetime.py  | 19 +++++
 python/pyspark/pandas/usage_logging/__init__.py    |  2 -
 8 files changed, 161 insertions(+), 73 deletions(-)

diff --git a/python/docs/source/reference/pyspark.pandas/indexing.rst b/python/docs/source/reference/pyspark.pandas/indexing.rst
index 9d53f00..a1ba8b0 100644
--- a/python/docs/source/reference/pyspark.pandas/indexing.rst
+++ b/python/docs/source/reference/pyspark.pandas/indexing.rst
@@ -183,6 +183,7 @@ Categorical components
    CategoricalIndex.set_categories
    CategoricalIndex.as_ordered
    CategoricalIndex.as_unordered
+   CategoricalIndex.map
 
 .. _api.multiindex:
 
diff --git a/python/pyspark/pandas/indexes/category.py b/python/pyspark/pandas/indexes/category.py
index 193c126..908f56b 100644
--- a/python/pyspark/pandas/indexes/category.py
+++ b/python/pyspark/pandas/indexes/category.py
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from functools import partial
 from typing import Any, Callable, List, Optional, Union, cast, no_type_check
 
 import pandas as pd
@@ -23,7 +22,6 @@ from pandas.api.types import is_hashable, CategoricalDtype
 from pyspark import pandas as ps
 from pyspark.pandas.indexes.base import Index
 from pyspark.pandas.internal import InternalField
-from pyspark.pandas.missing.indexes import MissingPandasLikeCategoricalIndex
 from pyspark.pandas.series import Series
 from pyspark.sql.types import StructField
 
@@ -633,21 +631,79 @@ class CategoricalIndex(Index):
             self.to_series().cat.set_categories(new_categories, ordered=ordered, rename=rename)
         ).rename(self.name)
 
-    def __getattr__(self, item: str) -> Any:
-        if hasattr(MissingPandasLikeCategoricalIndex, item):
-            property_or_func = getattr(MissingPandasLikeCategoricalIndex, item)
-            if isinstance(property_or_func, property):
-                return property_or_func.fget(self)  # type: ignore
-            else:
-                return partial(property_or_func, self)
-        raise AttributeError("'CategoricalIndex' object has no attribute '{}'".format(item))
-
-    def map(
-        self,
-        mapper: Union[dict, Callable[[Any], Any], pd.Series] = None,
-        na_action: Optional[str] = None,
+    def map(  # type: ignore[override]
+        self, mapper: Union[dict, Callable[[Any], Any], pd.Series]
     ) -> "Index":
-        return MissingPandasLikeCategoricalIndex.map(self, mapper, na_action)
+        """
+        Map values using input correspondence (a dict, Series, or function).
+
+        Maps the values (their categories, not the codes) of the index to new
+        categories. If the mapping correspondence is one-to-one the result is a
+        `CategoricalIndex` which has the same order property as the original,
+        otherwise an `Index` is returned.
+
+        If a `dict` or `Series` is used any unmapped category is mapped to missing values.
+        Note that if this happens an `Index` will be returned.
+
+        Parameters
+        ----------
+        mapper : function, dict, or Series
+            Mapping correspondence.
+
+        Returns
+        -------
+        CategoricalIndex or Index
+            Mapped index.
+
+        See Also
+        --------
+        Index.map : Apply a mapping correspondence on an `Index`.
+        Series.map : Apply a mapping correspondence on a `Series`
+        Series.apply : Apply more complex functions on a `Series`
+
+        Examples
+        --------
+        >>> idx = ps.CategoricalIndex(['a', 'b', 'c'])
+        >>> idx  # doctest: +NORMALIZE_WHITESPACE
+        CategoricalIndex(['a', 'b', 'c'],
+                         categories=['a', 'b', 'c'], ordered=False, dtype='category')
+
+        >>> idx.map(lambda x: x.upper())  # doctest: +NORMALIZE_WHITESPACE
+        CategoricalIndex(['A', 'B', 'C'],
+                         categories=['A', 'B', 'C'], ordered=False, dtype='category')
+
+        >>> pser = pd.Series([1, 2, 3], index=pd.CategoricalIndex(['a', 'b', 'c'], ordered=True))
+        >>> idx.map(pser)  # doctest: +NORMALIZE_WHITESPACE
+        CategoricalIndex([1, 2, 3],
+                         categories=[1, 2, 3], ordered=False, dtype='category')
+
+        >>> idx.map({'a': 'first', 'b': 'second', 'c': 'third'})  # doctest: +NORMALIZE_WHITESPACE
+        CategoricalIndex(['first', 'second', 'third'],
+                         categories=['first', 'second', 'third'], ordered=False, dtype='category')
+
+        If the mapping is one-to-one the ordering of the categories is preserved:
+
+        >>> idx = ps.CategoricalIndex(['a', 'b', 'c'], ordered=True)
+        >>> idx  # doctest: +NORMALIZE_WHITESPACE
+        CategoricalIndex(['a', 'b', 'c'],
+                         categories=['a', 'b', 'c'], ordered=True, dtype='category')
+
+        >>> idx.map({'a': 3, 'b': 2, 'c': 1})  # doctest: +NORMALIZE_WHITESPACE
+        CategoricalIndex([3, 2, 1],
+                         categories=[3, 2, 1], ordered=True, dtype='category')
+
+        If the mapping is not one-to-one an `Index` is returned:
+
+        >>> idx.map({'a': 'first', 'b': 'second', 'c': 'first'})
+        Index(['first', 'second', 'first'], dtype='object')
+
+        If a `dict` is used, all unmapped categories are mapped to None and
+        the result is an `Index`:
+
+        >>> idx.map({'a': 'first', 'b': 'second'})
+        Index(['first', 'second', None], dtype='object')
+        """
+        return super().map(mapper)
 
 
 def _test() -> None:
diff --git a/python/pyspark/pandas/indexes/datetimes.py b/python/pyspark/pandas/indexes/datetimes.py
index 691d8f9..6998adf 100644
--- a/python/pyspark/pandas/indexes/datetimes.py
+++ b/python/pyspark/pandas/indexes/datetimes.py
@@ -16,7 +16,7 @@
 #
 import datetime
 from functools import partial
-from typing import Any, Callable, Optional, Union, cast, no_type_check
+from typing import Any, Optional, Union, cast, no_type_check
 
 import pandas as pd
 from pandas.api.types import is_hashable
@@ -741,13 +741,6 @@ class DatetimeIndex(Index):
             psdf = psdf.pandas_on_spark.apply_batch(pandas_at_time)
         return ps.Index(first_series(psdf).rename(self.name))
 
-    def map(
-        self,
-        mapper: Union[dict, Callable[[Any], Any], pd.Series] = None,
-        na_action: Optional[str] = None,
-    ) -> "Index":
-        return MissingPandasLikeDatetimeIndex.map(self, mapper, na_action)
-
 
 def disallow_nanoseconds(freq: Union[str, DateOffset]) -> None:
     if freq in ["N", "ns"]:
diff --git a/python/pyspark/pandas/missing/indexes.py b/python/pyspark/pandas/missing/indexes.py
index 90e0c3e..07861fe 100644
--- a/python/pyspark/pandas/missing/indexes.py
+++ b/python/pyspark/pandas/missing/indexes.py
@@ -117,13 +117,6 @@ class MissingPandasLikeDatetimeIndex(MissingPandasLikeIndex):
     to_pydatetime = _unsupported_function("to_pydatetime", cls="DatetimeIndex")
     mean = _unsupported_function("mean", cls="DatetimeIndex")
     std = _unsupported_function("std", cls="DatetimeIndex")
-    map = _unsupported_function("map", cls="DatetimeIndex")
-
-
-class MissingPandasLikeCategoricalIndex(MissingPandasLikeIndex):
-
-    # Functions
-    map = _unsupported_function("map", cls="CategoricalIndex")
 
 
 class MissingPandasLikeMultiIndex(object):
diff --git a/python/pyspark/pandas/tests/indexes/test_base.py b/python/pyspark/pandas/tests/indexes/test_base.py
index 170249a..605e3f8 100644
--- a/python/pyspark/pandas/tests/indexes/test_base.py
+++ b/python/pyspark/pandas/tests/indexes/test_base.py
@@ -26,7 +26,6 @@ import pandas as pd
 import pyspark.pandas as ps
 from pyspark.pandas.exceptions import PandasNotImplementedError
 from pyspark.pandas.missing.indexes import (
-    MissingPandasLikeCategoricalIndex,
     MissingPandasLikeDatetimeIndex,
     MissingPandasLikeIndex,
     MissingPandasLikeMultiIndex,
@@ -515,29 +514,6 @@ class IndexesTest(PandasOnSparkTestCase, TestUtils):
             ):
                 getattr(psdf.set_index("c").index, name)()
 
-        # CategoricalIndex functions
-        missing_functions = inspect.getmembers(
-            MissingPandasLikeCategoricalIndex, inspect.isfunction
-        )
-        unsupported_functions = [
-            name for (name, type_) in missing_functions if type_.__name__ == "unsupported_function"
-        ]
-        for name in unsupported_functions:
-            with self.assertRaisesRegex(
-                PandasNotImplementedError,
-                "method.*Index.*{}.*not implemented( yet\\.|\\. .+)".format(name),
-            ):
-                getattr(psdf.set_index("d").index, name)()
-
-        deprecated_functions = [
-            name for (name, type_) in missing_functions if type_.__name__ == "deprecated_function"
-        ]
-        for name in deprecated_functions:
-            with self.assertRaisesRegex(
-                PandasNotImplementedError, "method.*Index.*{}.*is deprecated".format(name)
-            ):
-                getattr(psdf.set_index("d").index, name)()
-
         # Index properties
         missing_properties = inspect.getmembers(
             MissingPandasLikeIndex, lambda o: isinstance(o, property)
@@ -608,22 +584,6 @@ class IndexesTest(PandasOnSparkTestCase, TestUtils):
             ):
                 getattr(psdf.set_index("c").index, name)
 
-        # CategoricalIndex properties
-        missing_properties = inspect.getmembers(
-            MissingPandasLikeCategoricalIndex, lambda o: isinstance(o, property)
-        )
-        unsupported_properties = [
-            name
-            for (name, type_) in missing_properties
-            if type_.fget.__name__ == "unsupported_property"
-        ]
-        for name in unsupported_properties:
-            with self.assertRaisesRegex(
-                PandasNotImplementedError,
-                "property.*Index.*{}.*not implemented( yet\\.|\\. .+)".format(name),
-            ):
-                getattr(psdf.set_index("d").index, name)
-
     def test_index_has_duplicates(self):
         indexes = [("a", "b", "c"), ("a", "a", "c"), (1, 3, 3), (1, 2, 3)]
         names = [None, "ks", "ks", None]
diff --git a/python/pyspark/pandas/tests/indexes/test_category.py b/python/pyspark/pandas/tests/indexes/test_category.py
index 69d4667..8393b31 100644
--- a/python/pyspark/pandas/tests/indexes/test_category.py
+++ b/python/pyspark/pandas/tests/indexes/test_category.py
@@ -361,6 +361,74 @@ class CategoricalIndexTest(PandasOnSparkTestCase, TestUtils):
             lambda: psidx.set_categories(["a", "c", "b", "o"], inplace=True),
         )
 
+    def test_map(self):
+        pidxs = [pd.CategoricalIndex([1, 2, 3]), pd.CategoricalIndex([1, 2, 3], ordered=True)]
+        psidxs = [ps.from_pandas(pidx) for pidx in pidxs]
+
+        for pidx, psidx in zip(pidxs, psidxs):
+
+            # Apply dict
+            self.assert_eq(
+                pidx.map({1: "one", 2: "two", 3: "three"}),
+                psidx.map({1: "one", 2: "two", 3: "three"}),
+            )
+            self.assert_eq(
+                pidx.map({1: "one", 2: "two", 3: "one"}),
+                psidx.map({1: "one", 2: "two", 3: "one"}),
+            )
+            self.assert_eq(
+                pidx.map({1: "one", 2: "two"}),
+                psidx.map({1: "one", 2: "two"}),
+            )
+            self.assert_eq(
+                pidx.map({1: "one", 2: "two"}),
+                psidx.map({1: "one", 2: "two"}),
+            )
+            self.assert_eq(
+                pidx.map({1: 10, 2: 20}),
+                psidx.map({1: 10, 2: 20}),
+            )
+
+            # Apply lambda
+            self.assert_eq(
+                pidx.map(lambda id: id + 1),
+                psidx.map(lambda id: id + 1),
+            )
+            self.assert_eq(
+                pidx.map(lambda id: id + 1.1),
+                psidx.map(lambda id: id + 1.1),
+            )
+            self.assert_eq(
+                pidx.map(lambda id: "{id} + 1".format(id=id)),
+                psidx.map(lambda id: "{id} + 1".format(id=id)),
+            )
+
+            # Apply series
+            pser = pd.Series(["one", "two", "three"], index=[1, 2, 3])
+            self.assert_eq(
+                pidx.map(pser),
+                psidx.map(pser),
+            )
+            pser = pd.Series(["one", "two", "three"])
+            self.assert_eq(
+                pidx.map(pser),
+                psidx.map(pser),
+            )
+            self.assert_eq(
+                pidx.map(pser),
+                psidx.map(pser),
+            )
+            pser = pd.Series([1, 2, 3])
+            self.assert_eq(
+                pidx.map(pser),
+                psidx.map(pser),
+            )
+
+            self.assertRaises(
+                TypeError,
+                lambda: psidx.map({1: 1, 2: 2.0, 3: "three"}),
+            )
+
 
 if __name__ == "__main__":
     import unittest
diff --git a/python/pyspark/pandas/tests/indexes/test_datetime.py b/python/pyspark/pandas/tests/indexes/test_datetime.py
index 380b481..e3bf14e 100644
--- a/python/pyspark/pandas/tests/indexes/test_datetime.py
+++ b/python/pyspark/pandas/tests/indexes/test_datetime.py
@@ -221,6 +221,25 @@ class DatetimeIndexTest(PandasOnSparkTestCase, TestUtils):
             self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psidx - other)
             self.assertRaises(NotImplementedError, lambda: py_datetime - psidx)
 
+    def test_map(self):
+        for psidx, pidx in self.idx_pairs:
+            self.assert_eq(psidx.map(lambda x: x.normalize()), pidx.map(lambda x: x.normalize()))
+            self.assert_eq(
+                psidx.map(lambda x: x.strftime("%B %d, %Y, %r")),
+                pidx.map(lambda x: x.strftime("%B %d, %Y, %r")),
+            )
+
+        pidx = pd.date_range(start="2020-08-08", end="2020-08-10")
+        psidx = ps.from_pandas(pidx)
+        mapper_dict = {
+            datetime.datetime(2020, 8, 8): datetime.datetime(2021, 8, 8),
+            datetime.datetime(2020, 8, 9): datetime.datetime(2021, 8, 9),
+        }
+        self.assert_eq(psidx.map(mapper_dict), pidx.map(mapper_dict))
+
+        mapper_pser = pd.Series([1, 2, 3], index=pidx)
+        self.assert_eq(psidx.map(mapper_pser), pidx.map(mapper_pser))
+
 
 if __name__ == "__main__":
     import unittest
diff --git a/python/pyspark/pandas/usage_logging/__init__.py b/python/pyspark/pandas/usage_logging/__init__.py
index 3e0fa7b..9293534b 100644
--- a/python/pyspark/pandas/usage_logging/__init__.py
+++ b/python/pyspark/pandas/usage_logging/__init__.py
@@ -41,7 +41,6 @@ from pyspark.pandas.missing.groupby import (
     MissingPandasLikeSeriesGroupBy,
 )
 from pyspark.pandas.missing.indexes import (
-    MissingPandasLikeCategoricalIndex,
     MissingPandasLikeDatetimeIndex,
     MissingPandasLikeIndex,
     MissingPandasLikeMultiIndex,
@@ -157,7 +156,6 @@ def attach(logger_module: Union[str, ModuleType]) -> None:
         (pd.Series, MissingPandasLikeSeries),
         (pd.Index, MissingPandasLikeIndex),
         (pd.MultiIndex, MissingPandasLikeMultiIndex),
-        (pd.CategoricalIndex, MissingPandasLikeCategoricalIndex),
         (pd.DatetimeIndex, MissingPandasLikeDatetimeIndex),
         (pd.core.groupby.DataFrameGroupBy, MissingPandasLikeDataFrameGroupBy),
         (pd.core.groupby.SeriesGroupBy, MissingPandasLikeSeriesGroupBy),

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org