You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2024/01/24 09:16:21 UTC

[PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

zhengruifeng opened a new pull request, #44869:
URL: https://github.com/apache/spark/pull/44869

   ### What changes were proposed in this pull request?
   1, Introduce a basic fallback mechanism for frame methods, with a new option `compute.pandas_fallback` default false;
   2, implement `Frame.asfreq` and `Frame.asof`
   
   ### Why are the changes needed?
   for pandas parity
   
   ### Does this PR introduce _any_ user-facing change?
   yes
   
   ```
   In [1]: import pyspark.pandas as ps
      ...: import pandas as pd
      ...: 
      ...: index = pd.date_range('1/1/2000', periods=4, freq='min')
      ...: series = pd.Series([0.0, None, 2.0, 3.0], index=index)
      ...: pdf = pd.DataFrame({'s': series})
      ...: psdf = ps.from_pandas(pdf)
   
   In [2]: psdf.asfreq(freq='30s')
   ---------------------------------------------------------------------------
   PandasNotImplementedError                 Traceback (most recent call last)
   Cell In[2], line 1
   ----> 1 psdf.asfreq(freq='30s')
   
   File ~/Dev/spark/python/pyspark/pandas/missing/__init__.py:23, in unsupported_function.<locals>.unsupported_function(*args, **kwargs)
        22 def unsupported_function(*args, **kwargs):
   ---> 23     raise PandasNotImplementedError(
        24         class_name=class_name, method_name=method_name, reason=reason
        25     )
   
   PandasNotImplementedError: The method `pd.DataFrame.asfreq()` is not implemented yet.
   
   In [3]: ps.set_option("compute.pandas_fallback", True)
   
   In [4]: psdf.asfreq(freq='30s')
   /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:1015: PandasAPIOnSparkAdviceWarning: `asfreq` is executed in fallback mode. It loads partial data into the driver's memory to infer the schema, and loads all data into one executor's memory to compute. It should only be used if the pandas DataFrame is expected to be small.
     warnings.warn(message, PandasAPIOnSparkAdviceWarning)
   /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:1015: PandasAPIOnSparkAdviceWarning: If the type hints is not specified for `groupby.apply`, it is expensive to infer the data type internally.
     warnings.warn(message, PandasAPIOnSparkAdviceWarning)
   Out[4]:                                                                         
                          s
   2000-01-01 00:00:00  0.0
   2000-01-01 00:00:30  NaN
   2000-01-01 00:01:00  NaN
   2000-01-01 00:01:30  NaN
   2000-01-01 00:02:00  2.0
   2000-01-01 00:02:30  NaN
   2000-01-01 00:03:00  3.0
   ```
   
   
   ### How was this patch tested?
   added ut
   
   ### Was this patch authored or co-authored using generative AI tooling?
   no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng closed pull request #44869: [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods
URL: https://github.com/apache/spark/pull/44869


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44869:
URL: https://github.com/apache/spark/pull/44869#discussion_r1465819924


##########
python/pyspark/pandas/frame.py:
##########
@@ -13446,10 +13447,46 @@ def _index_normalized_frame(level: int, psser_or_psdf: DataFrameOrSeries) -> "Da
 
         return psdf
 
+    def _fall_back_frame(self, method: str) -> Callable:
+        def _internal_fall_back_function_(*inputs: Any, **kwargs: Any):
+            log_advice(
+                f"`{method}` is executed in fallback mode. It loads partial data into the driver's memory"
+                f" to infer the schema, and loads all data into one executor's memory to compute. "
+                "It should only be used if the pandas DataFrame is expected to be small."
+            )
+            input_df = self.copy()
+
+            uid = str(uuid.uuid4()).replace("-", "")
+            tmp_agg_column_name = f"__tmp_aggregate_col_for_frame_{method}_{uid}__"
+            tmp_idx_column_name = f"__tmp_index_col_for_frame_{method}_{uid}__"

Review Comment:
   I guess we need to handle index separate:
   1, the fallback pandas method itself may change the index (change the index column or change the values in the index);
   2, `df.groupby.apply` returns a psdf with a multi-index containing both original index and the group column, which is not needed in the fallback. e.g.
   
   ```
   In [12]: input_df
   Out[12]:
                          s  __tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__
   2000-01-01 00:00:00  0.0                                                                        0
   2000-01-01 00:01:00  NaN                                                                        0
   2000-01-01 00:02:00  2.0                                                                        0
   2000-01-01 00:03:00  3.0                                                                        0
   
   In [13]: input_df.index
   Out[13]:
   DatetimeIndex(['2000-01-01 00:00:00', '2000-01-01 00:01:00',
                  '2000-01-01 00:02:00', '2000-01-01 00:03:00'],
                 dtype='datetime64[ns]', freq=None)
   
   In [14]: output_df = input_df.groupby(tmp_agg_column_name).apply(lambda df: df)
   /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:1015: PandasAPIOnSparkAdviceWarning: If the type hints is not specified for `groupby.apply`, it is expensive to infer the data type internally.
     warnings.warn(message, PandasAPIOnSparkAdviceWarning)
   
   In [15]: output_df.index
   Out[15]:
   MultiIndex([(0, '2000-01-01 00:00:00'),
               (0, '2000-01-01 00:01:00'),
               (0, '2000-01-01 00:02:00'),
               (0, '2000-01-01 00:03:00')],
              names=['__tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__', None])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #44869:
URL: https://github.com/apache/spark/pull/44869#issuecomment-1909740615

   thanks for reviews. The last commit only updates the todo item.
   merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #44869:
URL: https://github.com/apache/spark/pull/44869#issuecomment-1907717618

   cc @HyukjinKwon @itholic @xinrong-meng @ueshin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44869:
URL: https://github.com/apache/spark/pull/44869#discussion_r1465697165


##########
python/pyspark/pandas/frame.py:
##########
@@ -13446,10 +13447,46 @@ def _index_normalized_frame(level: int, psser_or_psdf: DataFrameOrSeries) -> "Da
 
         return psdf
 
+    def _fall_back_frame(self, method: str) -> Callable:
+        def _internal_fall_back_function_(*inputs: Any, **kwargs: Any):
+            log_advice(
+                f"`{method}` is executed in fallback mode. It loads partial data into the driver's memory"
+                f" to infer the schema, and loads all data into one executor's memory to compute. "
+                "It should only be used if the pandas DataFrame is expected to be small."
+            )
+            input_df = self.copy()
+
+            uid = str(uuid.uuid4()).replace("-", "")
+            tmp_agg_column_name = f"__tmp_aggregate_col_for_frame_{method}_{uid}__"

Review Comment:
   no need to do it with uuid I believe. we can just do `verify_temp_column_name(sdf, "__tmp_agg_col_for_frame_{method}__")`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44869:
URL: https://github.com/apache/spark/pull/44869#discussion_r1465822177


##########
python/pyspark/pandas/frame.py:
##########
@@ -13446,10 +13447,46 @@ def _index_normalized_frame(level: int, psser_or_psdf: DataFrameOrSeries) -> "Da
 
         return psdf
 
+    def _fall_back_frame(self, method: str) -> Callable:
+        def _internal_fall_back_function_(*inputs: Any, **kwargs: Any):
+            log_advice(
+                f"`{method}` is executed in fallback mode. It loads partial data into the driver's memory"
+                f" to infer the schema, and loads all data into one executor's memory to compute. "
+                "It should only be used if the pandas DataFrame is expected to be small."
+            )
+            input_df = self.copy()
+
+            uid = str(uuid.uuid4()).replace("-", "")
+            tmp_agg_column_name = f"__tmp_aggregate_col_for_frame_{method}_{uid}__"
+            tmp_idx_column_name = f"__tmp_index_col_for_frame_{method}_{uid}__"

Review Comment:
   so I just make such conversion:
   
   - input psdf index -> input psdf normal column -> reset pdf index -> fallback computation
   - fallback computation -> output pdf index -> output pdf normal column -> reset psdf index



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #44869:
URL: https://github.com/apache/spark/pull/44869#issuecomment-1909732054

   > Nice!
   > 
   > Just for my personal clarity, so now only `DataFrame.asfreq` and `DataFrame.asof` can be fallback to Pandas when the option is `True`, right?
   
   yes, I think we need to check other missing methods by adding UTs.
   and some cases can not be simply supported in this way, e.g. io functions like `to_hdf`, plotting functions, multiple frame operations like `compare`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44869:
URL: https://github.com/apache/spark/pull/44869#discussion_r1465690543


##########
python/pyspark/pandas/config.py:
##########
@@ -273,6 +273,15 @@ def validate(self, v: Any) -> None:
             "'plotting.max_rows' should be greater than or equal to 0.",
         ),
     ),
+    Option(
+        key="compute.pandas_fallback",

Review Comment:
   Might need to edit the docs (see the top:
   
   ```
   # NOTE: if you are fixing or adding an option here, make sure you execute `show_options()` and
   #     copy & paste the results into show_options
   #     'docs/source/user_guide/pandas_on_spark/options.rst' as well.
   #     See the examples below:
   #     >>> from pyspark.pandas.config import show_options
   #     >>> show_options()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44869:
URL: https://github.com/apache/spark/pull/44869#discussion_r1465819924


##########
python/pyspark/pandas/frame.py:
##########
@@ -13446,10 +13447,46 @@ def _index_normalized_frame(level: int, psser_or_psdf: DataFrameOrSeries) -> "Da
 
         return psdf
 
+    def _fall_back_frame(self, method: str) -> Callable:
+        def _internal_fall_back_function_(*inputs: Any, **kwargs: Any):
+            log_advice(
+                f"`{method}` is executed in fallback mode. It loads partial data into the driver's memory"
+                f" to infer the schema, and loads all data into one executor's memory to compute. "
+                "It should only be used if the pandas DataFrame is expected to be small."
+            )
+            input_df = self.copy()
+
+            uid = str(uuid.uuid4()).replace("-", "")
+            tmp_agg_column_name = f"__tmp_aggregate_col_for_frame_{method}_{uid}__"
+            tmp_idx_column_name = f"__tmp_index_col_for_frame_{method}_{uid}__"

Review Comment:
   I guess we need to handle index separate:
   1, the fallback pandas method itself may change the index (change the index column or change the values in the index);
   2, `df.groupby.apply` returns a psdf with a multi-index containing both original index and the group column, which is not needed in the fallback case. e.g.
   
   ```
   In [12]: input_df
   Out[12]:
                          s  __tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__
   2000-01-01 00:00:00  0.0                                                                        0
   2000-01-01 00:01:00  NaN                                                                        0
   2000-01-01 00:02:00  2.0                                                                        0
   2000-01-01 00:03:00  3.0                                                                        0
   
   In [13]: input_df.index
   Out[13]:
   DatetimeIndex(['2000-01-01 00:00:00', '2000-01-01 00:01:00',
                  '2000-01-01 00:02:00', '2000-01-01 00:03:00'],
                 dtype='datetime64[ns]', freq=None)
   
   In [14]: output_df = input_df.groupby(tmp_agg_column_name).apply(lambda df: df)
   /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:1015: PandasAPIOnSparkAdviceWarning: If the type hints is not specified for `groupby.apply`, it is expensive to infer the data type internally.
     warnings.warn(message, PandasAPIOnSparkAdviceWarning)
   
   In [15]: output_df.index
   Out[15]:
   MultiIndex([(0, '2000-01-01 00:00:00'),
               (0, '2000-01-01 00:01:00'),
               (0, '2000-01-01 00:02:00'),
               (0, '2000-01-01 00:03:00')],
              names=['__tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__', None])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44869:
URL: https://github.com/apache/spark/pull/44869#discussion_r1465819924


##########
python/pyspark/pandas/frame.py:
##########
@@ -13446,10 +13447,46 @@ def _index_normalized_frame(level: int, psser_or_psdf: DataFrameOrSeries) -> "Da
 
         return psdf
 
+    def _fall_back_frame(self, method: str) -> Callable:
+        def _internal_fall_back_function_(*inputs: Any, **kwargs: Any):
+            log_advice(
+                f"`{method}` is executed in fallback mode. It loads partial data into the driver's memory"
+                f" to infer the schema, and loads all data into one executor's memory to compute. "
+                "It should only be used if the pandas DataFrame is expected to be small."
+            )
+            input_df = self.copy()
+
+            uid = str(uuid.uuid4()).replace("-", "")
+            tmp_agg_column_name = f"__tmp_aggregate_col_for_frame_{method}_{uid}__"
+            tmp_idx_column_name = f"__tmp_index_col_for_frame_{method}_{uid}__"

Review Comment:
   I guess we need to handle index separate:
   1, the fallback pandas method itself may change the index (change the index column or change the values in the index);
   2, `df.groupby.apply` returns a psdf with a multi-index containing both original index and the group column, which is not needed in the fallback. e.g.
   
   ```
   In [11]: output_df.index
   Out[11]:
   MultiIndex([(0, '2000-01-01 00:00:00'),
               (0, '2000-01-01 00:01:00'),
               (0, '2000-01-01 00:02:00'),
               (0, '2000-01-01 00:03:00')],
              names=['__tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__', None])
   
   In [12]: input_df
   Out[12]:
                          s  __tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__
   2000-01-01 00:00:00  0.0                                                                        0
   2000-01-01 00:01:00  NaN                                                                        0
   2000-01-01 00:02:00  2.0                                                                        0
   2000-01-01 00:03:00  3.0                                                                        0
   
   In [13]: input_df.index
   Out[13]:
   DatetimeIndex(['2000-01-01 00:00:00', '2000-01-01 00:01:00',
                  '2000-01-01 00:02:00', '2000-01-01 00:03:00'],
                 dtype='datetime64[ns]', freq=None)
   
   In [14]: output_df = input_df.groupby(tmp_agg_column_name).apply(lambda df: df)
   /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:1015: PandasAPIOnSparkAdviceWarning: If the type hints is not specified for `groupby.apply`, it is expensive to infer the data type internally.
     warnings.warn(message, PandasAPIOnSparkAdviceWarning)
   
   In [15]: output_df.index
   Out[15]:
   MultiIndex([(0, '2000-01-01 00:00:00'),
               (0, '2000-01-01 00:01:00'),
               (0, '2000-01-01 00:02:00'),
               (0, '2000-01-01 00:03:00')],
              names=['__tmp_aggregate_col_for_frame_asfreq_3a5c07ce00424d8fbed5408f4192879c__', None])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44869:
URL: https://github.com/apache/spark/pull/44869#discussion_r1465695774


##########
python/pyspark/pandas/frame.py:
##########
@@ -13446,10 +13447,46 @@ def _index_normalized_frame(level: int, psser_or_psdf: DataFrameOrSeries) -> "Da
 
         return psdf
 
+    def _fall_back_frame(self, method: str) -> Callable:
+        def _internal_fall_back_function_(*inputs: Any, **kwargs: Any):
+            log_advice(
+                f"`{method}` is executed in fallback mode. It loads partial data into the driver's memory"
+                f" to infer the schema, and loads all data into one executor's memory to compute. "
+                "It should only be used if the pandas DataFrame is expected to be small."
+            )
+            input_df = self.copy()
+
+            uid = str(uuid.uuid4()).replace("-", "")
+            tmp_agg_column_name = f"__tmp_aggregate_col_for_frame_{method}_{uid}__"
+            tmp_idx_column_name = f"__tmp_index_col_for_frame_{method}_{uid}__"

Review Comment:
   agg column I understood but do we need to manually handle index too? If the schema is inferred, it should automatically address index as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame methods [spark]

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #44869:
URL: https://github.com/apache/spark/pull/44869#discussion_r1466057270


##########
python/pyspark/pandas/frame.py:
##########
@@ -13446,10 +13446,53 @@ def _index_normalized_frame(level: int, psser_or_psdf: DataFrameOrSeries) -> "Da
 
         return psdf
 
+    def _fall_back_frame(self, method: str) -> Callable:
+        def _internal_fall_back_function(*inputs: Any, **kwargs: Any) -> "DataFrame":
+            log_advice(
+                f"`{method}` is executed in fallback mode. It loads partial data into the "
+                f"driver's memory to infer the schema, and loads all data into one executor's "
+                f"memory to compute. It should only be used if the pandas DataFrame is expected "
+                f"to be small."
+            )
+
+            input_df = self.copy()
+            index_names = input_df.index.names
+
+            sdf = input_df._internal.spark_frame
+            tmp_agg_column_name = verify_temp_column_name(
+                sdf, f"__tmp_aggregate_col_for_frame_{method}__"
+            )
+            input_df[tmp_agg_column_name] = 0
+
+            tmp_idx_column_name = verify_temp_column_name(
+                sdf, f"__tmp_index_col_for_frame_{method}__"
+            )
+            input_df[tmp_idx_column_name] = input_df.index
+
+            # TODO: specify the return type if possible

Review Comment:
   Not sure if we want to create a JIRA for future work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org