You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ueshin (via GitHub)" <gi...@apache.org> on 2023/05/23 21:22:18 UTC

[GitHub] [spark] ueshin commented on a diff in pull request #41240: [SPARK-43545][SQL][PYTHON] Support nested timestamp type

ueshin commented on code in PR #41240:
URL: https://github.com/apache/spark/pull/41240#discussion_r1203029027


##########
python/pyspark/sql/pandas/conversion.py:
##########
@@ -375,22 +379,105 @@ def _convert_from_pandas(
         assert isinstance(self, SparkSession)
 
         if timezone is not None:
-            from pyspark.sql.pandas.types import _check_series_convert_timestamps_tz_local
+            from pyspark.sql.pandas.types import (
+                _check_series_convert_timestamps_tz_local,
+                _get_local_timezone,
+            )
             from pandas.core.dtypes.common import is_datetime64tz_dtype, is_timedelta64_dtype
 
             copied = False
             if isinstance(schema, StructType):
-                for field in schema:
-                    # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
-                    if isinstance(field.dataType, TimestampType):
-                        s = _check_series_convert_timestamps_tz_local(pdf[field.name], timezone)
-                        if s is not pdf[field.name]:
-                            if not copied:
-                                # Copy once if the series is modified to prevent the original
-                                # Pandas DataFrame from being updated
-                                pdf = pdf.copy()
-                                copied = True
-                            pdf[field.name] = s
+
+                def _create_converter(data_type: DataType) -> Callable[[pd.Series], pd.Series]:
+                    if isinstance(data_type, TimestampType):
+
+                        def correct_timestamp(pser: pd.Series) -> pd.Series:
+                            return _check_series_convert_timestamps_tz_local(pser, timezone)
+
+                        return correct_timestamp
+
+                    def _converter(dt: DataType) -> Optional[Callable[[Any], Any]]:
+                        if isinstance(dt, ArrayType):
+                            element_conv = _converter(dt.elementType) or (lambda x: x)
+
+                            def convert_array(value: Any) -> Any:
+                                if value is None:
+                                    return None
+                                else:
+                                    return [element_conv(v) for v in value]
+
+                            return convert_array
+
+                        elif isinstance(dt, MapType):
+                            key_conv = _converter(dt.keyType) or (lambda x: x)
+                            value_conv = _converter(dt.valueType) or (lambda x: x)
+
+                            def convert_map(value: Any) -> Any:
+                                if value is None:
+                                    return None
+                                else:
+                                    return {key_conv(k): value_conv(v) for k, v in value.items()}
+
+                            return convert_map
+
+                        elif isinstance(dt, StructType):
+                            field_names = dt.names
+                            dedup_field_names = _dedup_names(field_names)
+                            field_convs = [
+                                _converter(f.dataType) or (lambda x: x) for f in dt.fields
+                            ]
+
+                            def convert_struct(value: Any) -> Any:
+                                if value is None:
+                                    return None
+                                elif isinstance(value, dict):
+                                    _values = [
+                                        field_convs[i](value.get(name, None))
+                                        for i, name in enumerate(dedup_field_names)
+                                    ]
+                                    return _create_row(field_names, _values)
+                                else:
+                                    _values = [
+                                        field_convs[i](value[i]) for i, name in enumerate(value)
+                                    ]
+                                    return _create_row(field_names, _values)
+
+                            return convert_struct
+
+                        elif isinstance(dt, TimestampType):
+
+                            def convert_timestamp(value: Any) -> Any:
+                                if value is None:
+                                    return None
+                                else:
+                                    return (
+                                        pd.Timestamp(value)
+                                        .tz_localize(timezone, ambiguous=False)  # type: ignore

Review Comment:
   Let me leave it without the error tag. I see a weird error in CI I can't reproduce in my local:
   
   ```
   starting mypy annotations test...
   annotations failed mypy checks:
   python/pyspark/sql/pandas/conversion.py:454: error: Unexpected keyword argument "ambiguous" for "tz_localize" of "Timestamp"; did you mean "ambigious"?  [call-arg]
   /usr/local/lib/python3.9/dist-packages/pandas/_libs/tslibs/timestamps.pyi:38: note: "tz_localize" of "Timestamp" defined here
   python/pyspark/sql/pandas/conversion.py:456: error: unused "type: ignore" comment
   Found 2 errors in 1 file (checked 512 source files)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org