You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/24 06:23:23 UTC
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #37564: [SPARK-40135][PS] Support `data` mixed with `index` in DataFrame creation
HyukjinKwon commented on code in PR #37564:
URL: https://github.com/apache/spark/pull/37564#discussion_r953393376
##########
python/pyspark/pandas/frame.py:
##########
@@ -425,42 +423,64 @@ class DataFrame(Frame, Generic[T]):
def __init__( # type: ignore[no-untyped-def]
self, data=None, index=None, columns=None, dtype=None, copy=False
):
+ index_assigned = False
if isinstance(data, InternalFrame):
- assert index is None
assert columns is None
assert dtype is None
assert not copy
- internal = data
+ if index is None:
+ internal = data
elif isinstance(data, SparkDataFrame):
- assert index is None
assert columns is None
assert dtype is None
assert not copy
- internal = InternalFrame(spark_frame=data, index_spark_columns=None)
+ if index is None:
+ internal = InternalFrame(spark_frame=data, index_spark_columns=None)
+ elif isinstance(data, ps.DataFrame):
+ assert columns is None
+ assert dtype is None
+ assert not copy
+ if index is None:
+ internal = data._internal.resolved_copy
elif isinstance(data, ps.Series):
- assert index is None
assert columns is None
assert dtype is None
assert not copy
- data = data.to_frame()
- internal = data._internal
+ if index is None:
+ internal = data.to_frame()._internal.resolved_copy
else:
- if isinstance(data, pd.DataFrame):
- assert index is None
- assert columns is None
- assert dtype is None
- assert not copy
- pdf = data
- else:
- from pyspark.pandas.indexes.base import Index
+ from pyspark.pandas.indexes.base import Index
- if isinstance(index, Index):
- raise TypeError(
- "The given index cannot be a pandas-on-Spark index. "
- "Try pandas index or array-like."
- )
- pdf = pd.DataFrame(data=data, index=index, columns=columns, dtype=dtype, copy=copy)
+ if index is not None and isinstance(index, Index):
+ # with local data, collect ps.Index to driver
+ # to avoid mismatched results between
+ # ps.DataFrame([1, 2], index=ps.Index([1, 2]))
+ # and
+ # pd.DataFrame([1, 2], index=pd.Index([1, 2]))
+ index = index.to_pandas()
+
+ pdf = pd.DataFrame(data=data, index=index, columns=columns, dtype=dtype, copy=copy)
internal = InternalFrame.from_pandas(pdf)
+ index_assigned = True
+
+ if index is not None and not index_assigned:
+ data_df = ps.DataFrame(data=data, index=None, columns=columns, dtype=dtype, copy=copy)
+ index_ps = ps.Index(index)
+ index_df = index_ps.to_frame()
+
+ # drop un-matched rows in `data`
+ # note that `combine_frames` can not work with a MultiIndex for now
+ combined = combine_frames(data_df, index_df, how="right")
+ combined_labels = combined._internal.column_labels
+ index_labels = [label for label in combined_labels if label[0] == "that"]
+ combined = combined.set_index(index_labels)
+
+ combined._internal._column_labels = data_df._internal.column_labels
Review Comment:
Could we use `combined_internal.copy(...)` instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org