You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/08/23 01:09:17 UTC
[spark] branch master updated: [SPARK-36470][PYTHON] Implement
`CategoricalIndex.map` and `DatetimeIndex.map`
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0b6af46 [SPARK-36470][PYTHON] Implement `CategoricalIndex.map` and `DatetimeIndex.map`
0b6af46 is described below
commit 0b6af464dc669b72948d8848375ed414e2d7f1c8
Author: Xinrong Meng <xi...@databricks.com>
AuthorDate: Mon Aug 23 10:08:40 2021 +0900
[SPARK-36470][PYTHON] Implement `CategoricalIndex.map` and `DatetimeIndex.map`
### What changes were proposed in this pull request?
Implement `CategoricalIndex.map` and `DatetimeIndex.map`
`MultiIndex.map` cannot be implemented in the same way as the `map` of other indexes. It should be taken care of separately if necessary.
### Why are the changes needed?
Mapping values using input correspondence is a common operation that is supported in pandas. We shall support that as well.
### Does this PR introduce _any_ user-facing change?
Yes. `CategoricalIndex.map` and `DatetimeIndex.map` can be used now.
- CategoricalIndex.map
```py
>>> idx = ps.CategoricalIndex(['a', 'b', 'c'])
>>> idx
CategoricalIndex(['a', 'b', 'c'], categories=['a', 'b', 'c'], ordered=False, dtype='category')
>>> idx.map(lambda x: x.upper())
CategoricalIndex(['A', 'B', 'C'], categories=['A', 'B', 'C'], ordered=False, dtype='category')
>>> pser = pd.Series([1, 2, 3], index=pd.CategoricalIndex(['a', 'b', 'c'], ordered=True))
>>> idx.map(pser)
CategoricalIndex([1, 2, 3], categories=[1, 2, 3], ordered=True, dtype='category')
>>> idx.map({'a': 'first', 'b': 'second', 'c': 'third'})
CategoricalIndex(['first', 'second', 'third'], categories=['first', 'second', 'third'], ordered=False, dtype='category')
```
- DatetimeIndex.map
```py
>>> pidx = pd.date_range(start="2020-08-08", end="2020-08-10")
>>> psidx = ps.from_pandas(pidx)
>>> mapper_dict = {
... datetime.datetime(2020, 8, 8): datetime.datetime(2021, 8, 8),
... datetime.datetime(2020, 8, 9): datetime.datetime(2021, 8, 9),
... }
>>> psidx.map(mapper_dict)
DatetimeIndex(['2021-08-08', '2021-08-09', 'NaT'], dtype='datetime64[ns]', freq=None)
>>> mapper_pser = pd.Series([1, 2, 3], index=pidx)
>>> psidx.map(mapper_pser)
Int64Index([1, 2, 3], dtype='int64')
>>> psidx
DatetimeIndex(['2020-08-08', '2020-08-09', '2020-08-10'], dtype='datetime64[ns]', freq=None)
>>> psidx.map(lambda x: x.strftime("%B %d, %Y, %r"))
Index(['August 08, 2020, 12:00:00 AM', 'August 09, 2020, 12:00:00 AM',
'August 10, 2020, 12:00:00 AM'],
dtype='object')
```
### How was this patch tested?
Unit tests.
Closes #33756 from xinrong-databricks/other_indexes_map.
Authored-by: Xinrong Meng <xi...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../source/reference/pyspark.pandas/indexing.rst | 1 +
python/pyspark/pandas/indexes/category.py | 88 ++++++++++++++++++----
python/pyspark/pandas/indexes/datetimes.py | 9 +--
python/pyspark/pandas/missing/indexes.py | 7 --
python/pyspark/pandas/tests/indexes/test_base.py | 40 ----------
.../pyspark/pandas/tests/indexes/test_category.py | 68 +++++++++++++++++
.../pyspark/pandas/tests/indexes/test_datetime.py | 19 +++++
python/pyspark/pandas/usage_logging/__init__.py | 2 -
8 files changed, 161 insertions(+), 73 deletions(-)
diff --git a/python/docs/source/reference/pyspark.pandas/indexing.rst b/python/docs/source/reference/pyspark.pandas/indexing.rst
index 9d53f00..a1ba8b0 100644
--- a/python/docs/source/reference/pyspark.pandas/indexing.rst
+++ b/python/docs/source/reference/pyspark.pandas/indexing.rst
@@ -183,6 +183,7 @@ Categorical components
CategoricalIndex.set_categories
CategoricalIndex.as_ordered
CategoricalIndex.as_unordered
+ CategoricalIndex.map
.. _api.multiindex:
diff --git a/python/pyspark/pandas/indexes/category.py b/python/pyspark/pandas/indexes/category.py
index 193c126..908f56b 100644
--- a/python/pyspark/pandas/indexes/category.py
+++ b/python/pyspark/pandas/indexes/category.py
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from functools import partial
from typing import Any, Callable, List, Optional, Union, cast, no_type_check
import pandas as pd
@@ -23,7 +22,6 @@ from pandas.api.types import is_hashable, CategoricalDtype
from pyspark import pandas as ps
from pyspark.pandas.indexes.base import Index
from pyspark.pandas.internal import InternalField
-from pyspark.pandas.missing.indexes import MissingPandasLikeCategoricalIndex
from pyspark.pandas.series import Series
from pyspark.sql.types import StructField
@@ -633,21 +631,79 @@ class CategoricalIndex(Index):
self.to_series().cat.set_categories(new_categories, ordered=ordered, rename=rename)
).rename(self.name)
- def __getattr__(self, item: str) -> Any:
- if hasattr(MissingPandasLikeCategoricalIndex, item):
- property_or_func = getattr(MissingPandasLikeCategoricalIndex, item)
- if isinstance(property_or_func, property):
- return property_or_func.fget(self) # type: ignore
- else:
- return partial(property_or_func, self)
- raise AttributeError("'CategoricalIndex' object has no attribute '{}'".format(item))
-
- def map(
- self,
- mapper: Union[dict, Callable[[Any], Any], pd.Series] = None,
- na_action: Optional[str] = None,
+ def map( # type: ignore[override]
+ self, mapper: Union[dict, Callable[[Any], Any], pd.Series]
) -> "Index":
- return MissingPandasLikeCategoricalIndex.map(self, mapper, na_action)
+ """
+ Map values using input correspondence (a dict, Series, or function).
+
+ Maps the values (their categories, not the codes) of the index to new
+ categories. If the mapping correspondence is one-to-one the result is a
+ `CategoricalIndex` which has the same order property as the original,
+ otherwise an `Index` is returned.
+
+ If a `dict` or `Series` is used any unmapped category is mapped to missing values.
+ Note that if this happens an `Index` will be returned.
+
+ Parameters
+ ----------
+ mapper : function, dict, or Series
+ Mapping correspondence.
+
+ Returns
+ -------
+ CategoricalIndex or Index
+ Mapped index.
+
+ See Also
+ --------
+ Index.map : Apply a mapping correspondence on an `Index`.
+ Series.map : Apply a mapping correspondence on a `Series`
+ Series.apply : Apply more complex functions on a `Series`
+
+ Examples
+ --------
+ >>> idx = ps.CategoricalIndex(['a', 'b', 'c'])
+ >>> idx # doctest: +NORMALIZE_WHITESPACE
+ CategoricalIndex(['a', 'b', 'c'],
+ categories=['a', 'b', 'c'], ordered=False, dtype='category')
+
+ >>> idx.map(lambda x: x.upper()) # doctest: +NORMALIZE_WHITESPACE
+ CategoricalIndex(['A', 'B', 'C'],
+ categories=['A', 'B', 'C'], ordered=False, dtype='category')
+
+ >>> pser = pd.Series([1, 2, 3], index=pd.CategoricalIndex(['a', 'b', 'c'], ordered=True))
+ >>> idx.map(pser) # doctest: +NORMALIZE_WHITESPACE
+ CategoricalIndex([1, 2, 3],
+ categories=[1, 2, 3], ordered=False, dtype='category')
+
+ >>> idx.map({'a': 'first', 'b': 'second', 'c': 'third'}) # doctest: +NORMALIZE_WHITESPACE
+ CategoricalIndex(['first', 'second', 'third'],
+ categories=['first', 'second', 'third'], ordered=False, dtype='category')
+
+ If the mapping is one-to-one the ordering of the categories is preserved:
+
+ >>> idx = ps.CategoricalIndex(['a', 'b', 'c'], ordered=True)
+ >>> idx # doctest: +NORMALIZE_WHITESPACE
+ CategoricalIndex(['a', 'b', 'c'],
+ categories=['a', 'b', 'c'], ordered=True, dtype='category')
+
+ >>> idx.map({'a': 3, 'b': 2, 'c': 1}) # doctest: +NORMALIZE_WHITESPACE
+ CategoricalIndex([3, 2, 1],
+ categories=[3, 2, 1], ordered=True, dtype='category')
+
+ If the mapping is not one-to-one an `Index` is returned:
+
+ >>> idx.map({'a': 'first', 'b': 'second', 'c': 'first'})
+ Index(['first', 'second', 'first'], dtype='object')
+
+ If a `dict` is used, all unmapped categories are mapped to None and
+ the result is an `Index`:
+
+ >>> idx.map({'a': 'first', 'b': 'second'})
+ Index(['first', 'second', None], dtype='object')
+ """
+ return super().map(mapper)
def _test() -> None:
diff --git a/python/pyspark/pandas/indexes/datetimes.py b/python/pyspark/pandas/indexes/datetimes.py
index 691d8f9..6998adf 100644
--- a/python/pyspark/pandas/indexes/datetimes.py
+++ b/python/pyspark/pandas/indexes/datetimes.py
@@ -16,7 +16,7 @@
#
import datetime
from functools import partial
-from typing import Any, Callable, Optional, Union, cast, no_type_check
+from typing import Any, Optional, Union, cast, no_type_check
import pandas as pd
from pandas.api.types import is_hashable
@@ -741,13 +741,6 @@ class DatetimeIndex(Index):
psdf = psdf.pandas_on_spark.apply_batch(pandas_at_time)
return ps.Index(first_series(psdf).rename(self.name))
- def map(
- self,
- mapper: Union[dict, Callable[[Any], Any], pd.Series] = None,
- na_action: Optional[str] = None,
- ) -> "Index":
- return MissingPandasLikeDatetimeIndex.map(self, mapper, na_action)
-
def disallow_nanoseconds(freq: Union[str, DateOffset]) -> None:
if freq in ["N", "ns"]:
diff --git a/python/pyspark/pandas/missing/indexes.py b/python/pyspark/pandas/missing/indexes.py
index 90e0c3e..07861fe 100644
--- a/python/pyspark/pandas/missing/indexes.py
+++ b/python/pyspark/pandas/missing/indexes.py
@@ -117,13 +117,6 @@ class MissingPandasLikeDatetimeIndex(MissingPandasLikeIndex):
to_pydatetime = _unsupported_function("to_pydatetime", cls="DatetimeIndex")
mean = _unsupported_function("mean", cls="DatetimeIndex")
std = _unsupported_function("std", cls="DatetimeIndex")
- map = _unsupported_function("map", cls="DatetimeIndex")
-
-
-class MissingPandasLikeCategoricalIndex(MissingPandasLikeIndex):
-
- # Functions
- map = _unsupported_function("map", cls="CategoricalIndex")
class MissingPandasLikeMultiIndex(object):
diff --git a/python/pyspark/pandas/tests/indexes/test_base.py b/python/pyspark/pandas/tests/indexes/test_base.py
index 170249a..605e3f8 100644
--- a/python/pyspark/pandas/tests/indexes/test_base.py
+++ b/python/pyspark/pandas/tests/indexes/test_base.py
@@ -26,7 +26,6 @@ import pandas as pd
import pyspark.pandas as ps
from pyspark.pandas.exceptions import PandasNotImplementedError
from pyspark.pandas.missing.indexes import (
- MissingPandasLikeCategoricalIndex,
MissingPandasLikeDatetimeIndex,
MissingPandasLikeIndex,
MissingPandasLikeMultiIndex,
@@ -515,29 +514,6 @@ class IndexesTest(PandasOnSparkTestCase, TestUtils):
):
getattr(psdf.set_index("c").index, name)()
- # CategoricalIndex functions
- missing_functions = inspect.getmembers(
- MissingPandasLikeCategoricalIndex, inspect.isfunction
- )
- unsupported_functions = [
- name for (name, type_) in missing_functions if type_.__name__ == "unsupported_function"
- ]
- for name in unsupported_functions:
- with self.assertRaisesRegex(
- PandasNotImplementedError,
- "method.*Index.*{}.*not implemented( yet\\.|\\. .+)".format(name),
- ):
- getattr(psdf.set_index("d").index, name)()
-
- deprecated_functions = [
- name for (name, type_) in missing_functions if type_.__name__ == "deprecated_function"
- ]
- for name in deprecated_functions:
- with self.assertRaisesRegex(
- PandasNotImplementedError, "method.*Index.*{}.*is deprecated".format(name)
- ):
- getattr(psdf.set_index("d").index, name)()
-
# Index properties
missing_properties = inspect.getmembers(
MissingPandasLikeIndex, lambda o: isinstance(o, property)
@@ -608,22 +584,6 @@ class IndexesTest(PandasOnSparkTestCase, TestUtils):
):
getattr(psdf.set_index("c").index, name)
- # CategoricalIndex properties
- missing_properties = inspect.getmembers(
- MissingPandasLikeCategoricalIndex, lambda o: isinstance(o, property)
- )
- unsupported_properties = [
- name
- for (name, type_) in missing_properties
- if type_.fget.__name__ == "unsupported_property"
- ]
- for name in unsupported_properties:
- with self.assertRaisesRegex(
- PandasNotImplementedError,
- "property.*Index.*{}.*not implemented( yet\\.|\\. .+)".format(name),
- ):
- getattr(psdf.set_index("d").index, name)
-
def test_index_has_duplicates(self):
indexes = [("a", "b", "c"), ("a", "a", "c"), (1, 3, 3), (1, 2, 3)]
names = [None, "ks", "ks", None]
diff --git a/python/pyspark/pandas/tests/indexes/test_category.py b/python/pyspark/pandas/tests/indexes/test_category.py
index 69d4667..8393b31 100644
--- a/python/pyspark/pandas/tests/indexes/test_category.py
+++ b/python/pyspark/pandas/tests/indexes/test_category.py
@@ -361,6 +361,74 @@ class CategoricalIndexTest(PandasOnSparkTestCase, TestUtils):
lambda: psidx.set_categories(["a", "c", "b", "o"], inplace=True),
)
+ def test_map(self):
+ pidxs = [pd.CategoricalIndex([1, 2, 3]), pd.CategoricalIndex([1, 2, 3], ordered=True)]
+ psidxs = [ps.from_pandas(pidx) for pidx in pidxs]
+
+ for pidx, psidx in zip(pidxs, psidxs):
+
+ # Apply dict
+ self.assert_eq(
+ pidx.map({1: "one", 2: "two", 3: "three"}),
+ psidx.map({1: "one", 2: "two", 3: "three"}),
+ )
+ self.assert_eq(
+ pidx.map({1: "one", 2: "two", 3: "one"}),
+ psidx.map({1: "one", 2: "two", 3: "one"}),
+ )
+ self.assert_eq(
+ pidx.map({1: "one", 2: "two"}),
+ psidx.map({1: "one", 2: "two"}),
+ )
+ self.assert_eq(
+ pidx.map({1: "one", 2: "two"}),
+ psidx.map({1: "one", 2: "two"}),
+ )
+ self.assert_eq(
+ pidx.map({1: 10, 2: 20}),
+ psidx.map({1: 10, 2: 20}),
+ )
+
+ # Apply lambda
+ self.assert_eq(
+ pidx.map(lambda id: id + 1),
+ psidx.map(lambda id: id + 1),
+ )
+ self.assert_eq(
+ pidx.map(lambda id: id + 1.1),
+ psidx.map(lambda id: id + 1.1),
+ )
+ self.assert_eq(
+ pidx.map(lambda id: "{id} + 1".format(id=id)),
+ psidx.map(lambda id: "{id} + 1".format(id=id)),
+ )
+
+ # Apply series
+ pser = pd.Series(["one", "two", "three"], index=[1, 2, 3])
+ self.assert_eq(
+ pidx.map(pser),
+ psidx.map(pser),
+ )
+ pser = pd.Series(["one", "two", "three"])
+ self.assert_eq(
+ pidx.map(pser),
+ psidx.map(pser),
+ )
+ self.assert_eq(
+ pidx.map(pser),
+ psidx.map(pser),
+ )
+ pser = pd.Series([1, 2, 3])
+ self.assert_eq(
+ pidx.map(pser),
+ psidx.map(pser),
+ )
+
+ self.assertRaises(
+ TypeError,
+ lambda: psidx.map({1: 1, 2: 2.0, 3: "three"}),
+ )
+
if __name__ == "__main__":
import unittest
diff --git a/python/pyspark/pandas/tests/indexes/test_datetime.py b/python/pyspark/pandas/tests/indexes/test_datetime.py
index 380b481..e3bf14e 100644
--- a/python/pyspark/pandas/tests/indexes/test_datetime.py
+++ b/python/pyspark/pandas/tests/indexes/test_datetime.py
@@ -221,6 +221,25 @@ class DatetimeIndexTest(PandasOnSparkTestCase, TestUtils):
self.assertRaisesRegex(TypeError, expected_err_msg, lambda: psidx - other)
self.assertRaises(NotImplementedError, lambda: py_datetime - psidx)
+ def test_map(self):
+ for psidx, pidx in self.idx_pairs:
+ self.assert_eq(psidx.map(lambda x: x.normalize()), pidx.map(lambda x: x.normalize()))
+ self.assert_eq(
+ psidx.map(lambda x: x.strftime("%B %d, %Y, %r")),
+ pidx.map(lambda x: x.strftime("%B %d, %Y, %r")),
+ )
+
+ pidx = pd.date_range(start="2020-08-08", end="2020-08-10")
+ psidx = ps.from_pandas(pidx)
+ mapper_dict = {
+ datetime.datetime(2020, 8, 8): datetime.datetime(2021, 8, 8),
+ datetime.datetime(2020, 8, 9): datetime.datetime(2021, 8, 9),
+ }
+ self.assert_eq(psidx.map(mapper_dict), pidx.map(mapper_dict))
+
+ mapper_pser = pd.Series([1, 2, 3], index=pidx)
+ self.assert_eq(psidx.map(mapper_pser), pidx.map(mapper_pser))
+
if __name__ == "__main__":
import unittest
diff --git a/python/pyspark/pandas/usage_logging/__init__.py b/python/pyspark/pandas/usage_logging/__init__.py
index 3e0fa7b..9293534b 100644
--- a/python/pyspark/pandas/usage_logging/__init__.py
+++ b/python/pyspark/pandas/usage_logging/__init__.py
@@ -41,7 +41,6 @@ from pyspark.pandas.missing.groupby import (
MissingPandasLikeSeriesGroupBy,
)
from pyspark.pandas.missing.indexes import (
- MissingPandasLikeCategoricalIndex,
MissingPandasLikeDatetimeIndex,
MissingPandasLikeIndex,
MissingPandasLikeMultiIndex,
@@ -157,7 +156,6 @@ def attach(logger_module: Union[str, ModuleType]) -> None:
(pd.Series, MissingPandasLikeSeries),
(pd.Index, MissingPandasLikeIndex),
(pd.MultiIndex, MissingPandasLikeMultiIndex),
- (pd.CategoricalIndex, MissingPandasLikeCategoricalIndex),
(pd.DatetimeIndex, MissingPandasLikeDatetimeIndex),
(pd.core.groupby.DataFrameGroupBy, MissingPandasLikeDataFrameGroupBy),
(pd.core.groupby.SeriesGroupBy, MissingPandasLikeSeriesGroupBy),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org