You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2023/04/21 07:37:32 UTC

[GitHub] [spark] zhengruifeng opened a new pull request, #40896: [WIP][ML][PYTHON][CONNECT] Support Barrier Python UDF

zhengruifeng opened a new pull request, #40896:
URL: https://github.com/apache/spark/pull/40896

   ### What changes were proposed in this pull request?
   Support Barrier Python UDF, we could support it in general, but right now I want to narrow its usage scope because it is only used in Torch Distributor for now:
   
   - it is supported as a function attribute, without adding a user-facing API or Parameter;
   - it only works in `MapInPandas` and `MapInArrow`
   
   
   ### Why are the changes needed?
   Barrier should be an UDF attribute
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   existing UTs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1543855959

   > It seems to me that functions under python/pyspark/sql/pandas/utils.py are used internally - in the existing Spark source code only.
   
   This might be an issue, because the 3rd-party project "xgboost-spark" is going to use it. Currently "xgboost-spark" project already uses barrier RDD API, and in future we want to make it support spark connect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1543373200

   Not sure why we want to keep `barrier` as a standalone API if this cannot work together with other similar API. Why don't we just add a param to `mapInPandas` and `mapInArrow`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1188193093


##########
python/pyspark/sql/pandas/utils.py:
##########
@@ -84,3 +87,57 @@ def pyarrow_version_less_than_minimum(minimum_pyarrow_version: str) -> bool:
         return False
 
     return LooseVersion(pyarrow.__version__) < LooseVersion(minimum_pyarrow_version)
+
+
+def barrier(f: Callable) -> Callable:

Review Comment:
   If we're going to have this standalone API, this should work together with other similar API like `groupby().applyInPandas`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1177334616


##########
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto:
##########
@@ -333,6 +333,9 @@ message PythonUDF {
   bytes command = 3;
   // (Required) Python version being used in the client.
   string python_ver = 4;
+  // (Optional) Whether this PythonUDF should be executed in barrier mode.

Review Comment:
   Since this barrier udf is only used to integrate other ML frameworks (pytorch for now, will support xgboost in the future), so I guess it can be python only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1178192015


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -57,9 +57,18 @@ case class PythonUDF(
     children: Seq[Expression],
     evalType: Int,
     udfDeterministic: Boolean,
-    resultId: ExprId = NamedExpression.newExprId)
+    resultId: ExprId = NamedExpression.newExprId,
+    isBarrier: Boolean = false)
   extends Expression with Unevaluable with NonSQLExpression with UserDefinedExpression {
 
+  if (isBarrier &&
+    evalType != PythonEvalType.SQL_MAP_PANDAS_ITER_UDF &&
+    evalType != PythonEvalType.SQL_MAP_ARROW_ITER_UDF) {

Review Comment:
   ```suggestion
         evalType != PythonEvalType.SQL_MAP_PANDAS_ITER_UDF &&
         evalType != PythonEvalType.SQL_MAP_ARROW_ITER_UDF) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1527230215

   > Q: Where is related doc and example usage code for this attribute ?
   
   I will add it.
   
   
   > Q2: Why it is not a user-facing attribute ? We have 3rd-party library xgboost-spark that uses "barrier mode" and we need to make xgboost-spark supports pyspark-connect mode.
   
   we can add doc and test for it, to avoid break it in the future.
   
   But it is kind of `DeveloperApi`, our usage pattern is pretty limited. Moreover, it's likely to lead to weird behavior/failure to end users, due to the underlying `RDDBarrier` limitation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1176684697


##########
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto:
##########
@@ -333,6 +333,9 @@ message PythonUDF {
   bytes command = 3;
   // (Required) Python version being used in the client.
   string python_ver = 4;
+  // (Optional) Whether this PythonUDF should be executed in barrier mode.

Review Comment:
   Why limit this to python?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1542107229

   > It seems to me that functions under `python/pyspark/sql/pandas/utils.py` are used internally - in the existing Spark source code only.
   > 
   > A developer API "should" still be called externally, by developers though, e.g. `semanticHash`.
   > 
   > So defining a developer API in `utils.py` seems a little unclear to me.
   
   Can we move the `barrier` API definition to proper place ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1177333283


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -826,9 +826,6 @@ message MapPartitions {
 
   // (Required) Input user-defined function.
   CommonInlineUserDefinedFunction func = 2;
-
-  // (Optional) Whether to use barrier mode execution or not.
-  optional bool is_barrier = 3;

Review Comment:
   yes, but this field was added in master only. So it is not a breaking change against 3.4



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1182025735


##########
python/pyspark/sql/udf.py:
##########
@@ -249,6 +259,38 @@ def __init__(
         self.evalType = evalType
         self.deterministic = deterministic
 
+        # since 3.5.0, we introduce an internal optional function attribute '_is_barrier',
+        # which is dedicated for integration with external ML training frameworks including
+        # PyTorch and XGBoost.
+        # It indicates whether this UDF will be executed on barrier mode, and is only accepted
+        # in methods 'mapInPandas' and 'mapInArrow'.
+        # For example:
+        #
+        # df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
+        #
+        # def filter_func(iterator):
+        #     for pdf in iterator:
+        #         yield pdf[pdf.id == 1]
+        #
+        # filter_func._is_barrier = True # Mark this UDF is barrier

Review Comment:
   @WeichenXu123 I add an example here. also add UT to make sure it works.
   
   > Our xgboost users are already aware of the limitations, it is not an issue.
   > note that currently xgboost library( python package) already uses RDD barrier API,
   > in future we need to adapt xgboost with spark connect mode,
   > this means the SQL side barrier flag should also be an user-facing interface.
   
   I'm aware of the integration with XGBoost. It looks like only developer will use it.  Is current implementation of a function attribute enough to support it?
   
   The UDF is used much more widely than the RDDBarrier APIs, and my concern is that the end users is likely to abuse it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1178518856


##########
python/pyspark/sql/udf.py:
##########
@@ -465,6 +476,7 @@ def wrapper(*args: "ColumnOrName") -> Column:
         wrapper.returnType = self.returnType  # type: ignore[attr-defined]
         wrapper.evalType = self.evalType  # type: ignore[attr-defined]
         wrapper.deterministic = self.deterministic  # type: ignore[attr-defined]
+        wrapper.is_barrier = self.is_barrier  # type: ignore[attr-defined]

Review Comment:
   got it, let me rename it and add more comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xinrong-meng commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1540629445

   It seems to me that functions under `python/pyspark/sql/pandas/utils.py` are used internally - in the existing Spark source code only.
   
   A developer API "should" still be called externally, by developers though,  e.g. `semanticHash`.
   
   So defining a developer API in `utils.py` seems a little unclear to me.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1184588603


##########
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto:
##########
@@ -333,6 +333,9 @@ message PythonUDF {
   bytes command = 3;
   // (Required) Python version being used in the client.
   string python_ver = 4;
+  // (Optional) Whether this PythonUDF should be executed in barrier mode.

Review Comment:
   good point, will update. thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1184721358


##########
python/pyspark/sql/udf.py:
##########
@@ -249,6 +259,38 @@ def __init__(
         self.evalType = evalType
         self.deterministic = deterministic
 
+        # since 3.5.0, we introduce an internal optional function attribute '_is_barrier',
+        # which is dedicated for integration with external ML training frameworks including
+        # PyTorch and XGBoost.
+        # It indicates whether this UDF will be executed on barrier mode, and is only accepted
+        # in methods 'mapInPandas' and 'mapInArrow'.
+        # For example:
+        #
+        # df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
+        #
+        # def filter_func(iterator):
+        #     for pdf in iterator:
+        #         yield pdf[pdf.id == 1]
+        #
+        # filter_func._is_barrier = True # Mark this UDF is barrier

Review Comment:
   Define a `@barrier` decorator looks better, and we can document `barrier` doc saying this is a developer API and we should keep it stable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1184722725


##########
python/pyspark/sql/udf.py:
##########
@@ -249,6 +259,38 @@ def __init__(
         self.evalType = evalType
         self.deterministic = deterministic
 
+        # since 3.5.0, we introduce an internal optional function attribute '_is_barrier',
+        # which is dedicated for integration with external ML training frameworks including
+        # PyTorch and XGBoost.
+        # It indicates whether this UDF will be executed on barrier mode, and is only accepted
+        # in methods 'mapInPandas' and 'mapInArrow'.
+        # For example:
+        #
+        # df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
+        #
+        # def filter_func(iterator):
+        #     for pdf in iterator:
+        #         yield pdf[pdf.id == 1]
+        #
+        # filter_func._is_barrier = True # Mark this UDF is barrier

Review Comment:
   will have a try



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1184590924


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -73,9 +73,18 @@ case class PythonUDF(
     children: Seq[Expression],
     evalType: Int,
     udfDeterministic: Boolean,
-    resultId: ExprId = NamedExpression.newExprId)
+    resultId: ExprId = NamedExpression.newExprId,
+    isBarrier: Boolean = false)

Review Comment:
   > Do we expect more configs like this to be introduced in the future?
   
   I think we will not introduce more ML-specific configs.  
   FYI, We might introduce [dependency management](https://github.com/apache/spark/pull/40954) and stage-level scheduling in the future, but they are both more general.
   
   > Have we considered using a metadata or options map here?
   
   Good question, I am now thinking whether we can use `TreeNodeTag` to avoid changing the `PythonUDF` expression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1526988838

   > This PR will not add a user-facing API or Parameter or Annotation, instead only a private function attribute will be added
   
   Q: Where is related doc and example usage code for this attribute ? 
   
   Q2: Why it is not a user-facing attribute ? We have 3rd-party library xgboost-spark that uses "barrier mode" and we need to make xgboost-spark supports pyspark-connect mode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1542989294

   > If we're going to have this standalone API, this should work together with other similar API like groupby().applyInPandas.
   
   I think we won't support other Pandas API `groupby().applyInPandas`:
   
   - it is non-trivial to support due to limitation of `RDDBarrier`;
   - the ml side doesn't need them for now;
   
   
   > It seems to me that functions under python/pyspark/sql/pandas/utils.py are used internally - in the existing Spark source code only.
   
   > A developer API "should" still be called externally, by developers though, e.g. semanticHash.
   
   That is a good point, what about keeping `barrier` in `pandas/utils.py` and only used it internally like other helper functions? 
   
   @HyukjinKwon @WeichenXu123 @xinrong-meng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1184564949


##########
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto:
##########
@@ -333,6 +333,9 @@ message PythonUDF {
   bytes command = 3;
   // (Required) Python version being used in the client.
   string python_ver = 4;
+  // (Optional) Whether this PythonUDF should be executed in barrier mode.

Review Comment:
   Can you briefly describe in the comment what barrier mode is used for?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -73,9 +73,18 @@ case class PythonUDF(
     children: Seq[Expression],
     evalType: Int,
     udfDeterministic: Boolean,
-    resultId: ExprId = NamedExpression.newExprId)
+    resultId: ExprId = NamedExpression.newExprId,
+    isBarrier: Boolean = false)

Review Comment:
   This seems to be very specific to the ML use case and not generic for Python UDFs. Do we expect more configs like this to be introduced in the future? Have we considered using a metadata or options map here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1548778624

   I think we (@HyukjinKwon @WeichenXu123 and I) have reach to agreement that this PR (which introduces an `@barrier` annotation) is no longer, and we will keep current `barrier` parameter in `MapInPandas/Arrow`.
   
   The reason is that barrier UDF is still too specific to `MapInPandas/Arrow`, due to the limitation of underlying `RDDBarrier`:
   
   - can not use it within common dataframe operators like `select`, `withColumn`
   - can not use it within other similar pandas function like `applyInPandas`;
   
   I will close it in two days if no more comments.
   
   also cc @mengxr 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1176683940


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -826,9 +826,6 @@ message MapPartitions {
 
   // (Required) Input user-defined function.
   CommonInlineUserDefinedFunction func = 2;
-
-  // (Optional) Whether to use barrier mode execution or not.
-  optional bool is_barrier = 3;

Review Comment:
   This is a breaking change right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xinrong-meng commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1553398850

   +1 for the proposal, thanks @zhengruifeng !
   
   > I think we (@HyukjinKwon @WeichenXu123 and I) have reach to agreement that this PR (which introduces an `@barrier` annotation) is no longer, and we will keep current `barrier` parameter in `MapInPandas/Arrow`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng closed pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF
URL: https://github.com/apache/spark/pull/40896


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1527643314

   > But it is kind of DeveloperApi, our usage pattern is pretty limited. Moreover, it's likely to lead to weird behavior/failure to end users, due to the underlying RDDBarrier limitation.
   
   Our xgboost users are already aware of the limitations, it is not an issue.
   note that currently xgboost library( python package) already uses RDD barrier API,
   in future we need to adapt xgboost with spark connect mode, 
   this means the SQL side barrier flag should also be an user-facing interface.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1539284439

   let me move the functions to sql.pandas.utils, will be less visible then


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1177334616


##########
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto:
##########
@@ -333,6 +333,9 @@ message PythonUDF {
   bytes command = 3;
   // (Required) Python version being used in the client.
   string python_ver = 4;
+  // (Optional) Whether this PythonUDF should be executed in barrier mode.

Review Comment:
   Since this barrier udf is only used to integrate other ML frameworks (pytorch for now, will support xgboost), so I guess it can be python only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1177478664


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -618,6 +618,7 @@ def set_gpus(context: "BarrierTaskContext") -> None:
 
                 yield pd.DataFrame(data={"chunk": chunks})
 
+        wrapped_train_fn.is_barrier = True  # type: ignore[attr-defined]

Review Comment:
   This is the method to mark a UDF `barrier`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40896:
URL: https://github.com/apache/spark/pull/40896#issuecomment-1522945999

   cc @WeichenXu123 @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40896: [SPARK-43229][ML][PYTHON][CONNECT] Introduce Barrier Python UDF

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40896:
URL: https://github.com/apache/spark/pull/40896#discussion_r1178191685


##########
python/pyspark/sql/udf.py:
##########
@@ -465,6 +476,7 @@ def wrapper(*args: "ColumnOrName") -> Column:
         wrapper.returnType = self.returnType  # type: ignore[attr-defined]
         wrapper.evalType = self.evalType  # type: ignore[attr-defined]
         wrapper.deterministic = self.deterministic  # type: ignore[attr-defined]
+        wrapper.is_barrier = self.is_barrier  # type: ignore[attr-defined]

Review Comment:
   I would keep this as an internal property by naming `_is_barrier`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org