You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhenlineo (via GitHub)" <gi...@apache.org> on 2023/07/19 05:23:21 UTC

[GitHub] [spark] zhenlineo opened a new pull request, #42069: [WIP] Fix class loading problem caused by stub user classes not found on the server classpath

zhenlineo opened a new pull request, #42069:
URL: https://github.com/apache/spark/pull/42069

   TODO: throw helpful error for method not found exceptions?
   Test with maven 
   Test with Scala 13 and Scala 12.
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on PR #42069:
URL: https://github.com/apache/spark/pull/42069#issuecomment-1650802438

   @LuciferYang The 10 errors:
   * 8 Streaming related test failures: Something is wrong with the class loader, still investigating.
   * 2 new udf loading test failures: they needs the client jar file, so the test cannot run with `mvn clean`. I will update a warning to help with these two failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42069:
URL: https://github.com/apache/spark/pull/42069#issuecomment-1649433054

   > checked maven test with this pr, there are `10 TESTS FAILED`, further confirmation is needed to confirm whether all are related to this pr:
   > 
   > run
   > 
   > ```
   > build/mvn clean install -DskipTests -Phive
   > build/mvn clean test -pl connector/connect/client/jvm
   > ```
   > 
   > ```
   > FlatMapGroupsWithStateStreamingSuite:
   > - flatMapGroupsWithState - streaming *** FAILED ***
   >   org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
   >   at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
   >   at scala.collection.Iterator.toStream(Iterator.scala:1417)
   >   at scala.collection.Iterator.toStream$(Iterator.scala:1416)
   >   at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
   >   at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
   >   at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
   >   at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
   >   ...
   > - flatMapGroupsWithState - streaming - with initial state *** FAILED ***
   >   org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
   >   at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
   >   at scala.collection.Iterator.toStream(Iterator.scala:1417)
   >   at scala.collection.Iterator.toStream$(Iterator.scala:1416)
   >   at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
   >   at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
   >   at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
   >   at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
   >   ...
   > - mapGroupsWithState - streaming *** FAILED ***
   >   org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
   >   at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
   >   at scala.collection.Iterator.toStream(Iterator.scala:1417)
   >   at scala.collection.Iterator.toStream$(Iterator.scala:1416)
   >   at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
   >   at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
   >   at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
   >   at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
   >   ...
   > - mapGroupsWithState - streaming - with initial state *** FAILED ***
   >   org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
   >   at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
   >   at scala.collection.Iterator.toStream(Iterator.scala:1417)
   >   at scala.collection.Iterator.toStream$(Iterator.scala:1416)
   >   at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
   >   at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
   >   at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
   >   at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
   >   ...
   > - flatMapGroupsWithState *** FAILED ***
   >   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 489.0 failed 1 times, most recent failure: Lost task 0.0 in stage 489.0 (TID 1997) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
   > 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:87)
   > 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   > 	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
   > 	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
   > 	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   > 	at scala.collection.Iterator.foreach(Iterator.scala:943)
   > 	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   > 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   > 	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
   > 	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
   > 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
   > 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
   > 	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
   > 	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
   > 	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
   > 	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
   > 	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
   > 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
   > 	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
   > 	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
   > 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
   > 	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
   >   at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
   >   at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
   >   at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
   >   at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
   >   at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
   >   at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
   >   at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
   >   ...
   > - flatMapGroupsWithState - with initial state *** FAILED ***
   >   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 494.0 failed 1 times, most recent failure: Lost task 0.0 in stage 494.0 (TID 2006) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
   > 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:87)
   > 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   > 	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
   > 	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
   > 	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   > 	at scala.collection.Iterator.foreach(Iterator.scala:943)
   > 	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   > 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   > 	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
   > 	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
   > 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
   > 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
   > 	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
   > 	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
   > 	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
   > 	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
   > 	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
   > 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
   > 	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
   > 	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
   > 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
   > 	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
   >   at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
   >   at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
   >   at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
   >   at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
   >   at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
   >   at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
   >   at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
   >   ...
   > - mapGroupsWithState *** FAILED ***
   >   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 497.0 failed 1 times, most recent failure: Lost task 0.0 in stage 497.0 (TID 2013) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
   > 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:87)
   > 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   > 	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
   > 	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
   > 	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   > 	at scala.collection.Iterator.foreach(Iterator.scala:943)
   > 	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   > 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   > 	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
   > 	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
   > 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
   > 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
   > 	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
   > 	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
   > 	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
   > 	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
   > 	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
   > 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
   > 	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
   > 	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
   > 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
   > 	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
   >   at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
   >   at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
   >   at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
   >   at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
   >   at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
   >   at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
   >   at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
   >   ...
   > - mapGroupsWithState - with initial state *** FAILED ***
   >   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 502.0 failed 1 times, most recent failure: Lost task 0.0 in stage 502.0 (TID 2022) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
   > 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:87)
   > 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   > 	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
   > 	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
   > 	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   > 	at scala.collection.Iterator.foreach(Iterator.scala:943)
   > 	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   > 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   > 	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
   > 	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
   > 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
   > 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
   > 	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
   > 	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
   > 	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
   > 	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
   > 	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
   > 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
   > 	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
   > 	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
   > 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
   > 	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
   >   at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
   >   at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
   >   at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
   >   at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
   >   at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
   >   at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
   >   at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
   >   at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
   >   ...
   > - update class loader after stubbing: new session *** FAILED ***
   >   java.io.NotSerializableException: org.scalatest.Engine
   >   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
   >   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
   >   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
   >   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
   >   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
   >   at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
   >   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
   >   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
   >   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
   >   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
   >   ...
   > - update class loader after stubbing: same session *** FAILED ***
   >   java.io.NotSerializableException: org.scalatest.Engine
   >   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
   >   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
   >   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
   >   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
   >   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
   >   at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
   >   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
   >   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
   >   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
   >   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
   >   ...
   > *** 10 TESTS FAILED ***
   > ```
   
   All test failures only occur with this pr, but this PR solves four test failures in `SparkSessionE2ESuite`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278180938


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -554,7 +554,7 @@ class SparkSession private[sql] (
     val command = proto.Command.newBuilder().setRegisterFunction(udf).build()
     val plan = proto.Plan.newBuilder().setCommand(command).build()
 
-    client.execute(plan)
+    client.execute(plan).asScala.foreach(_ => ())

Review Comment:
   Why is this needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #42069:
URL: https://github.com/apache/spark/pull/42069#issuecomment-1656530635

   Merging this, it fixes a pretty big UX issue for UDFs!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278586088


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma-separated list of binary names of classes/packages that should be stubbed during
+          |the Scala UDF serde and execution if not found on the server classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default, the server stubs classes from the Scala client package.
+          |""".stripMargin)

Review Comment:
   IDK about that... don't you think that if the Spark Connect is used inside a real life bigger application, there may be many user classes that are not related to anything that the user wishes to execute on Spark cluster, but just various user application business logic that can get captured by accident just as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1273081014


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala:
##########
@@ -211,6 +199,23 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll {
         throw error
       }
     }
+
+    addClientTestArtifactInServerClasspath(spark)

Review Comment:
   should move into `if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) {`  block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on PR #42069:
URL: https://github.com/apache/spark/pull/42069#issuecomment-1650059521

   @LuciferYang Thanks for the detailed review. Let me fix them. The udf test failures might be caused by my class loading ordering with stub.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278226020


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala:
##########
@@ -161,7 +162,19 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
    */
   def classloader: ClassLoader = {
     val urls = getSparkConnectAddedJars :+ classDir.toUri.toURL
-    new URLClassLoader(urls.toArray, Utils.getContextOrSparkClassLoader)
+    val loader = if (SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES).nonEmpty) {
+      val stubClassLoader =
+        StubClassLoader(null, SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES))
+      new ChildFirstURLClassLoader(

Review Comment:
   Probably should to be consistent. Let me fix in a followup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1274247476


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.util.Arrays
+
+import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class UDFClassLoadingE2ESuite extends RemoteSparkSession {
+
+  test("load udf with default stub class loader") {
+    val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("update class loader after stubbing: new session") {

Review Comment:
   It worked okay on my machine. Could you try again after a sbt clean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278181125


##########
core/src/main/scala/org/apache/spark/util/StubClassLoader.scala:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.xbean.asm9.{ClassWriter, Opcodes}
+
+/**
+ * [[ClassLoader]] that replaces missing classes with stubs, if the cannot be found. It will only
+ * do this for classes that are marked for stubbing.
+ *
+ * While this is generally not a good idea. In this particular case this is used to load lambda's
+ * whose capturing class contains unknown (and unneeded) classes. The lambda itself does not need
+ * the class and therefor is safe to replace by a stub.
+ */
+class StubClassLoader(parent: ClassLoader, shouldStub: String => Boolean)
+  extends ClassLoader(parent) {
+  override def findClass(name: String): Class[_] = {
+    if (!shouldStub(name)) {
+      throw new ClassNotFoundException(name)
+    }
+    val bytes = StubClassLoader.generateStub(name)
+    defineClass(name, bytes, 0, bytes.length)
+  }
+}
+
+object StubClassLoader {
+  def apply(parent: ClassLoader, binaryName: Seq[String]): StubClassLoader = {
+    new StubClassLoader(parent, name => binaryName.exists(p => name.startsWith(p)))
+  }
+
+  def generateStub(binaryName: String): Array[Byte] = {
+    // Convert binary names to internal names.
+    val name = binaryName.replace('.', '/')
+    val classWriter = new ClassWriter(0)
+    classWriter.visit(
+      49,
+      Opcodes.ACC_PUBLIC + Opcodes.ACC_SUPER,
+      name,
+      null,
+      "java/lang/Object",
+      null)
+    classWriter.visitSource(name + ".java", null)
+
+    // Generate constructor.
+    val ctorWriter = classWriter.visitMethod(

Review Comment:
   Can you file a follow-up to make this throw an exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278225989


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma-separated list of binary names of classes/packages that should be stubbed during
+          |the Scala UDF serde and execution if not found on the server classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default, the server stubs classes from the Scala client package.
+          |""".stripMargin)
+      .version("3.5.0")
+      .stringConf
+      .toSequence
+      .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)

Review Comment:
   I tested with multiple prefixes, it should work too. e.g. "org.apache.spark.sql.connect.client, com.example".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278228203


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala:
##########
@@ -161,7 +162,19 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
    */
   def classloader: ClassLoader = {
     val urls = getSparkConnectAddedJars :+ classDir.toUri.toURL
-    new URLClassLoader(urls.toArray, Utils.getContextOrSparkClassLoader)
+    val loader = if (SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES).nonEmpty) {
+      val stubClassLoader =
+        StubClassLoader(null, SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES))
+      new ChildFirstURLClassLoader(

Review Comment:
   Actually it is fine. There are 3 existing class loader:
   
   User CL : classes added using --jar
   Sys CL: Spark + sys libs
   Session CL: classes added using session.addArtifacts
   
   In Executor:
   * normal: Sys -> (User + Session) -> Stub
   * reverse: (User + Session) -> Sys -> Stub 
   
   In Driver:
   * normal: (Sys + User) -> Session -> Stub
   * reverse: (User -> Sys) -> Session -> Stub 
   
   So here what you saw is () -> Session -> Stub.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath [spark]

Posted by "tenstriker (via GitHub)" <gi...@apache.org>.
tenstriker commented on PR #42069:
URL: https://github.com/apache/spark/pull/42069#issuecomment-1970244097

   I think this PR is affecting external users as well. We start spark-connect server with external jars and hitting similar ClassCastException error due to classloading issue: https://issues.apache.org/jira/browse/SPARK-46762


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on PR #42069:
URL: https://github.com/apache/spark/pull/42069#issuecomment-1654693559

   @LuciferYang @vicennial This is ready for another look, thanks.
   The PR should be merged to 3.5 as this is a bug fix for 3.5 UDFs. 
   cc @rednaxelafx @juliuszsompolski @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278214458


##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -1101,11 +1147,21 @@ private[spark] class Executor(
           // Add it to our class loader
           val url = new File(root, localName).toURI.toURL
           if (!state.urlClassLoader.getURLs().contains(url)) {
-            logInfo(s"Adding $url to class loader")
+            logInfo(s"Adding $url to class loader ${state.sessionUUID}")
             state.urlClassLoader.addURL(url)
+            if (!isDefaultState(state.sessionUUID)) {
+              updated = true
+            }
           }
         }
       }
+      if (updated) {
+        // When a new url is added for non-default class loader, recreate the class loader
+        // to ensure all classes are updated.
+        state.urlClassLoader = createClassLoader(state.urlClassLoader.getURLs, useStub = true)

Review Comment:
   Why do we recreate the URL classloader as well? Is that needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vicennial commented on a diff in pull request #42069: [WIP] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "vicennial (via GitHub)" <gi...@apache.org>.
vicennial commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1267720733


##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -1102,12 +1117,19 @@ private[spark] class Executor(
           val url = new File(root, localName).toURI.toURL
           if (!state.urlClassLoader.getURLs().contains(url)) {
             logInfo(s"Adding $url to class loader")
-            state.urlClassLoader.addURL(url)
+            // TODO: make use of the session cache for the class loader.
+            // Currently we invalidate all when adding a new url to always clear the stubbed
+            // classes in the current repl class loader.
+            // This is not always needed if the newly added jar does not contains any stubbed
+            // classes.
+            isolatedSessionCache.invalidateAll()

Review Comment:
   Shouldn't we only invalidate the `isolatedSession` that is being updated? Each session has its own individual `URLClassLoader`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1273081014


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala:
##########
@@ -211,6 +199,23 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll {
         throw error
       }
     }
+
+    addClientTestArtifactInServerClasspath(spark)

Review Comment:
   should move into `if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) {`  block, 



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.util.Arrays
+
+import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class UDFClassLoadingE2ESuite extends RemoteSparkSession {
+
+  test("load udf with default stub class loader") {
+    val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("update class loader after stubbing: new session") {
+    // Session1 uses Stub SparkResult class
+    val session1 = spark.newSession()

Review Comment:
   Do we have to use different SparkSessions in order to avoid failures caused by duplicate Artifact additions?



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.util.Arrays
+
+import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class UDFClassLoadingE2ESuite extends RemoteSparkSession {
+
+  test("load udf with default stub class loader") {
+    val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("update class loader after stubbing: new session") {
+    // Session1 uses Stub SparkResult class
+    val session1 = spark.newSession()
+    addClientTestArtifactInServerClasspath(session1)
+    val ds = session1.range(10).filter(n => n % 2 == 0)
+
+    val rows = ds.collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+
+    // Session2 uses the real SparkResult class
+    val session2 = spark.newSession()
+    addClientTestArtifactInServerClasspath(session2)
+    addClientTestArtifactInServerClasspath(session2, testJar = false)
+    val rows2 = session2
+      .range(10)
+      .filter(n => {
+        // Try to use spark result
+        new SparkResult[Int](null, null, null)

Review Comment:
   should pass `timeZoneId`



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala:
##########
@@ -30,7 +30,7 @@ object IntegrationTestUtils {
   // System properties used for testing and debugging
   private val DEBUG_SC_JVM_CLIENT = "spark.debug.sc.jvm.client"
   // Enable this flag to print all client debug log + server logs to the console
-  private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "false").toBoolean
+  private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "true").toBoolean

Review Comment:
   Will this change be reversed in the end?
   
   



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma separated list of binary names of classes/packages that should be stub during the
+          |Scala UDF serdeser and execution if not found on the server classpath.

Review Comment:
   ```suggestion
             |Scala UDF serde and execution if not found on the server classpath.
   ```



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.util.Arrays
+
+import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class UDFClassLoadingE2ESuite extends RemoteSparkSession {
+
+  test("load udf with default stub class loader") {
+    val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("update class loader after stubbing: new session") {

Review Comment:
   locally run `build/sbt clean "connect-client-jvm/testOnly *UDFClassLoadingE2ESuite"`
   
   ```
   [info] - update class loader after stubbing: new session *** FAILED *** (148 milliseconds)
   [info]   java.io.NotSerializableException: org.scalatest.Engine
   
   ```



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma separated list of binary names of classes/packages that should be stub during the
+          |Scala UDF serdeser and execution if not found on the server classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default the server stubs classes from the Scala client package.

Review Comment:
   ```suggestion
             |By default, the server stubs classes from the Scala client package.
   ```



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma separated list of binary names of classes/packages that should be stub during the

Review Comment:
   ```suggestion
             |Comma-separated list of binary names of classes/packages that should be stubbed during the
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vicennial commented on a diff in pull request #42069: [WIP] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "vicennial (via GitHub)" <gi...@apache.org>.
vicennial commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1267726085


##########
core/src/main/java/org/apache/spark/util/ChildFirstURLClassLoader.java:
##########
@@ -40,6 +40,15 @@ public ChildFirstURLClassLoader(URL[] urls, ClassLoader parent) {
     this.parent = new ParentClassLoader(parent);
   }
 
+  /**
+   * Specify the realParent if there is a need to load in the order of
+   * `realParent -> urls (child) -> parent`.

Review Comment:
   What does `realParent` mean here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1275071984


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.util.Arrays
+
+import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class UDFClassLoadingE2ESuite extends RemoteSparkSession {
+
+  test("load udf with default stub class loader") {
+    val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("update class loader after stubbing: new session") {

Review Comment:
   It worked okay on my machine too now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1274247845


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.util.Arrays
+
+import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class UDFClassLoadingE2ESuite extends RemoteSparkSession {
+
+  test("load udf with default stub class loader") {
+    val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("update class loader after stubbing: new session") {
+    // Session1 uses Stub SparkResult class
+    val session1 = spark.newSession()

Review Comment:
   No, I was not test any artifact duplication. It is purely to set the test start with a clean session.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278301295


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1504,15 +1506,24 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
   }
 
   private def unpackUdf(fun: proto.CommonInlineUserDefinedFunction): UdfPacket = {
-    Utils.deserialize[UdfPacket](
-      fun.getScalarScalaUdf.getPayload.toByteArray,
-      Utils.getContextOrSparkClassLoader)
+    unpackScalarScalaUDF[UdfPacket](fun.getScalarScalaUdf)
   }
 
   private def unpackForeachWriter(fun: proto.ScalarScalaUDF): ForeachWriterPacket = {
-    Utils.deserialize[ForeachWriterPacket](
-      fun.getPayload.toByteArray,
-      Utils.getContextOrSparkClassLoader)
+    unpackScalarScalaUDF[ForeachWriterPacket](fun)
+  }
+
+  private def unpackScalarScalaUDF[T](fun: proto.ScalarScalaUDF): T = {
+    try {
+      logDebug(s"Unpack using class loader: ${Utils.getContextOrSparkClassLoader}")
+      Utils.deserialize[T](fun.getPayload.toByteArray, Utils.getContextOrSparkClassLoader)
+    } catch {
+      case e: IOException if e.getCause.isInstanceOf[NoSuchMethodException] =>
+        throw new ClassNotFoundException(
+          s"Failed to load class correctly due to ${e.getCause}. " +
+            "Make sure the artifact where the class is defined is installed by calling" +
+            " session.addArtifact.")

Review Comment:
   In the description you write
   > If the user code is actually needed to execute the UDF, we will return an error message to suggest the user to add the missing classes using the addArtifact method.
   
   but since this triggers during deserialization, wouldn't this trigger also for a class that is not actually used, just accidentally pulled in, and not captured by the CONNECT_SCALA_UDF_STUB_CLASSES config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278583715


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma-separated list of binary names of classes/packages that should be stubbed during
+          |the Scala UDF serde and execution if not found on the server classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default, the server stubs classes from the Scala client package.
+          |""".stripMargin)

Review Comment:
   I generally expect user classes to be present on the classpath, if they are not the user needs to something anyway. The internal classes are a bit special because they can be captured by accident, so there stubbing makes more sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278375346


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma-separated list of binary names of classes/packages that should be stubbed during
+          |the Scala UDF serde and execution if not found on the server classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default, the server stubs classes from the Scala client package.
+          |""".stripMargin)

Review Comment:
   The stub class loader currently would be used for all `withSession` calls in drivers, and all task runs in executors.
   Perhaps we should move the stubbing only used for UDF class loading in drivers + more aggressive default e.g. "org, com". 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1504,15 +1506,24 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
   }
 
   private def unpackUdf(fun: proto.CommonInlineUserDefinedFunction): UdfPacket = {
-    Utils.deserialize[UdfPacket](
-      fun.getScalarScalaUdf.getPayload.toByteArray,
-      Utils.getContextOrSparkClassLoader)
+    unpackScalarScalaUDF[UdfPacket](fun.getScalarScalaUdf)
   }
 
   private def unpackForeachWriter(fun: proto.ScalarScalaUDF): ForeachWriterPacket = {
-    Utils.deserialize[ForeachWriterPacket](
-      fun.getPayload.toByteArray,
-      Utils.getContextOrSparkClassLoader)
+    unpackScalarScalaUDF[ForeachWriterPacket](fun)
+  }
+
+  private def unpackScalarScalaUDF[T](fun: proto.ScalarScalaUDF): T = {
+    try {
+      logDebug(s"Unpack using class loader: ${Utils.getContextOrSparkClassLoader}")
+      Utils.deserialize[T](fun.getPayload.toByteArray, Utils.getContextOrSparkClassLoader)
+    } catch {
+      case e: IOException if e.getCause.isInstanceOf[NoSuchMethodException] =>
+        throw new ClassNotFoundException(
+          s"Failed to load class correctly due to ${e.getCause}. " +
+            "Make sure the artifact where the class is defined is installed by calling" +
+            " session.addArtifact.")

Review Comment:
   > wouldn't this trigger also for a class that is not actually used, just accidentally pulled in, and not captured by the CONNECT_SCALA_UDF_STUB_CLASSES config
   
   This code you highlighted would not catch this class. Because your described case would fail with a `NoClassFoundException` rather than a `NoSuchMethodException`.



##########
core/src/main/scala/org/apache/spark/util/StubClassLoader.scala:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.xbean.asm9.{ClassWriter, Opcodes}
+
+/**
+ * [[ClassLoader]] that replaces missing classes with stubs, if the cannot be found. It will only
+ * do this for classes that are marked for stubbing.
+ *
+ * While this is generally not a good idea. In this particular case this is used to load lambda's
+ * whose capturing class contains unknown (and unneeded) classes. The lambda itself does not need
+ * the class and therefor is safe to replace by a stub.
+ */
+class StubClassLoader(parent: ClassLoader, shouldStub: String => Boolean)
+  extends ClassLoader(parent) {
+  override def findClass(name: String): Class[_] = {
+    if (!shouldStub(name)) {
+      throw new ClassNotFoundException(name)
+    }
+    val bytes = StubClassLoader.generateStub(name)
+    defineClass(name, bytes, 0, bytes.length)
+  }
+}
+
+object StubClassLoader {
+  def apply(parent: ClassLoader, binaryName: Seq[String]): StubClassLoader = {
+    new StubClassLoader(parent, name => binaryName.exists(p => name.startsWith(p)))
+  }
+
+  def generateStub(binaryName: String): Array[Byte] = {

Review Comment:
   When user actually uses a class, it normally would be `val clazz = new Clazz(); clazz.callMethod`, when this happens, it fails earlier at compile to find the method before we come here (throw the error from constructor during runtime).
   
   Throwing an error from constructor would only help if the user calls `val clazz = new Class()`. And did not use the class afterwords.
   
   If you ask why not sub methods that the user would call and throw the error there? The reason is because it is too hard :) We need to scan the UDF contents. The `NoSuchMethodException` in `SparkConnectPlanner` is good enough to throw the error for us. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278217228


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")

Review Comment:
   `stubPrefixes`?



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma-separated list of binary names of classes/packages that should be stubbed during
+          |the Scala UDF serde and execution if not found on the server classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default, the server stubs classes from the Scala client package.
+          |""".stripMargin)
+      .version("3.5.0")
+      .stringConf
+      .toSequence
+      .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)

Review Comment:
   The `StubClassLoader` currently does not support multiple prefixes. Are we adding this somewhere in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278409336


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma-separated list of binary names of classes/packages that should be stubbed during
+          |the Scala UDF serde and execution if not found on the server classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default, the server stubs classes from the Scala client package.
+          |""".stripMargin)

Review Comment:
   Rubber duck questions :-):
   What are the risks of being more aggressive and stubbing everything?
   Why the risks are smaller if you were to do it only on the driver?
   Would it even work without doing it on executors? Executors execute this, so they need to have the stubs to not run into ClassNotFound?
   
   In the description you write
   >  Java serializer might include unnecessary user code e.g. User classes used in the lambda definition signatures in the same class where the UDF is defined.
   
   but with it defaulting to connect client classes only, it will actually not help for "User classes"?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42069:
URL: https://github.com/apache/spark/pull/42069#issuecomment-1649308167

   checked maven test with this pr, there are `10 TESTS FAILED`,  further confirmation is needed to confirm whether all are related to this pr:
   
   ```
   FlatMapGroupsWithStateStreamingSuite:
   - flatMapGroupsWithState - streaming *** FAILED ***
     org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
     at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
     at scala.collection.Iterator.toStream(Iterator.scala:1417)
     at scala.collection.Iterator.toStream$(Iterator.scala:1416)
     at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
     at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
     at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
     at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
     ...
   - flatMapGroupsWithState - streaming - with initial state *** FAILED ***
     org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
     at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
     at scala.collection.Iterator.toStream(Iterator.scala:1417)
     at scala.collection.Iterator.toStream$(Iterator.scala:1416)
     at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
     at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
     at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
     at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
     ...
   - mapGroupsWithState - streaming *** FAILED ***
     org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
     at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
     at scala.collection.Iterator.toStream(Iterator.scala:1417)
     at scala.collection.Iterator.toStream$(Iterator.scala:1416)
     at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
     at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
     at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
     at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
     ...
   - mapGroupsWithState - streaming - with initial state *** FAILED ***
     org.apache.spark.SparkException: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
     at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
     at scala.collection.Iterator.toStream(Iterator.scala:1417)
     at scala.collection.Iterator.toStream$(Iterator.scala:1416)
     at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
     at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:354)
     at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:354)
     at scala.collection.AbstractIterator.toSeq(Iterator.scala:1431)
     ...
   - flatMapGroupsWithState *** FAILED ***
     org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 489.0 failed 1 times, most recent failure: Lost task 0.0 in stage 489.0 (TID 1997) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:87)
   	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
   	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
   	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
   	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
   	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
   	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
   	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
   	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
   	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
   	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
   	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
   	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
   	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
     at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
     at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
     at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
     at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
     at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
     at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
     at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
     ...
   - flatMapGroupsWithState - with initial state *** FAILED ***
     org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 494.0 failed 1 times, most recent failure: Lost task 0.0 in stage 494.0 (TID 2006) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:87)
   	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
   	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
   	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
   	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
   	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
   	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
   	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
   	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
   	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
   	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
   	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
   	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
   	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
     at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
     at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
     at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
     at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
     at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
     at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
     at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
     ...
   - mapGroupsWithState *** FAILED ***
     org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 497.0 failed 1 times, most recent failure: Lost task 0.0 in stage 497.0 (TID 2013) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:87)
   	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
   	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
   	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
   	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
   	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
   	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
   	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
   	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
   	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
   	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
   	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
   	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
   	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
     at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
     at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
     at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
     at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
     at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
     at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
     at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
     ...
   - mapGroupsWithState - with initial state *** FAILED ***
     org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 502.0 failed 1 times, most recent failure: Lost task 0.0 in stage 502.0 (TID 2022) (localhost executor driver): java.lang.ClassCastException: org.apache.spark.sql.ClickState cannot be cast to org.apache.spark.sql.ClickState
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:87)
   	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
   	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.hasNext(ArrowConverters.scala:100)
   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
   	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
   	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
   	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
   	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
   	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
   	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
   	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
   	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
   	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
   	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
   	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$4(Sp...
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
     at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
     at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:153)
     at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:183)
     at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2813)
     at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3252)
     at org.apache.spark.sql.Dataset.collect(Dataset.scala:2812)
     at org.apache.spark.sql.connect.client.util.QueryTest.checkDataset(QueryTest.scala:54)
     ...
   - update class loader after stubbing: new session *** FAILED ***
     java.io.NotSerializableException: org.scalatest.Engine
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
     ...
   - update class loader after stubbing: same session *** FAILED ***
     java.io.NotSerializableException: org.scalatest.Engine
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
     ...
   *** 10 TESTS FAILED ***
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vicennial commented on a diff in pull request #42069: [WIP] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "vicennial (via GitHub)" <gi...@apache.org>.
vicennial commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1267724082


##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -543,11 +538,16 @@ private[spark] class Executor(
         // requires access to properties contained within (e.g. for access control).
         Executor.taskDeserializationProps.set(taskDescription.properties)
 
-        updateDependencies(
+        val updated = updateDependencies(
           taskDescription.artifacts.files,
           taskDescription.artifacts.jars,
           taskDescription.artifacts.archives,
           isolatedSession)
+        if (updated) {
+          // reset the thread class loader
+          val newIsolatedSession = getIsolatedSession(taskDescription)
+          Thread.currentThread.setContextClassLoader(newIsolatedSession.replClassLoader)
+        }

Review Comment:
   `updateDependencies` is called in several places, could we integrate this logic into the method itself to prevent cases where we end up using the outdated instance? 
   
   Maybe we can leave the `Thread.currentThread.setContextClassLoader(newIsolatedSession.replClassLoader)` part here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vicennial commented on a diff in pull request #42069: [WIP] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "vicennial (via GitHub)" <gi...@apache.org>.
vicennial commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1267729075


##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -860,6 +860,14 @@ private[spark] class Executor(
     }
   }
 
+  private def getIsolatedSession(
+      taskDescription: TaskDescription) = {
+    taskDescription.artifacts.state match {
+      case Some(jobArtifactState) =>
+        isolatedSessionCache.get(jobArtifactState.uuid, () => newSessionState(jobArtifactState))
+      case _ => defaultSessionState
+    }
+  }

Review Comment:
   Nit: This refactoring is unnecessary here. This also modifies [OSS-synced code](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L516-L521) so we should leave it as it is 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278217512


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma-separated list of binary names of classes/packages that should be stubbed during
+          |the Scala UDF serde and execution if not found on the server classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default, the server stubs classes from the Scala client package.
+          |""".stripMargin)
+      .version("3.5.0")
+      .stringConf
+      .toSequence
+      .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)

Review Comment:
   I am blind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278217400


##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -1101,11 +1147,21 @@ private[spark] class Executor(
           // Add it to our class loader
           val url = new File(root, localName).toURI.toURL
           if (!state.urlClassLoader.getURLs().contains(url)) {
-            logInfo(s"Adding $url to class loader")
+            logInfo(s"Adding $url to class loader ${state.sessionUUID}")
             state.urlClassLoader.addURL(url)
+            if (!isDefaultState(state.sessionUUID)) {
+              updated = true
+            }
           }
         }
       }
+      if (updated) {
+        // When a new url is added for non-default class loader, recreate the class loader
+        // to ensure all classes are updated.
+        state.urlClassLoader = createClassLoader(state.urlClassLoader.getURLs, useStub = true)

Review Comment:
   nvm I get it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278214606


##########
connector/connect/client/jvm/src/test/resources/StubClassDummyUdf.scala:
##########


Review Comment:
   This source file cannot be on the classpath, otherwise sbt would include it in the server system classpath. So it is outside in resources. We only needs the jars and binaries, which will be manually installed in session classpath. Keeping the source file is just in case anyone wondering what the dummy udf looks like.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath
URL: https://github.com/apache/spark/pull/42069


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278299618


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma-separated list of binary names of classes/packages that should be stubbed during
+          |the Scala UDF serde and execution if not found on the server classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default, the server stubs classes from the Scala client package.
+          |""".stripMargin)

Review Comment:
   So by default we will be stubbing if some Spark Connect client code is pulled into the UDF, but not if the serialization pulls some other class, unrelated to the client and not needed by the UDF, but just referenced in the contained class in a way that will make it pulled in?
   In that case the user would also get an error about ClassNotFound? 
   Do we in that case want the user to add that using an addArtifact, even though it might be unclear to the user why is that relevant to the UDF?
   What are the disadvantages of just stubbing everything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278183598


##########
core/src/main/scala/org/apache/spark/util/StubClassLoader.scala:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.xbean.asm9.{ClassWriter, Opcodes}
+
+/**
+ * [[ClassLoader]] that replaces missing classes with stubs, if the cannot be found. It will only
+ * do this for classes that are marked for stubbing.
+ *
+ * While this is generally not a good idea. In this particular case this is used to load lambda's
+ * whose capturing class contains unknown (and unneeded) classes. The lambda itself does not need
+ * the class and therefor is safe to replace by a stub.
+ */
+class StubClassLoader(parent: ClassLoader, shouldStub: String => Boolean)
+  extends ClassLoader(parent) {
+  override def findClass(name: String): Class[_] = {
+    if (!shouldStub(name)) {
+      throw new ClassNotFoundException(name)
+    }
+    val bytes = StubClassLoader.generateStub(name)
+    defineClass(name, bytes, 0, bytes.length)
+  }
+}
+
+object StubClassLoader {
+  def apply(parent: ClassLoader, binaryName: Seq[String]): StubClassLoader = {
+    new StubClassLoader(parent, name => binaryName.exists(p => name.startsWith(p)))
+  }
+
+  def generateStub(binaryName: String): Array[Byte] = {
+    // Convert binary names to internal names.
+    val name = binaryName.replace('.', '/')
+    val classWriter = new ClassWriter(0)
+    classWriter.visit(
+      49,
+      Opcodes.ACC_PUBLIC + Opcodes.ACC_SUPER,
+      name,
+      null,
+      "java/lang/Object",
+      null)
+    classWriter.visitSource(name + ".java", null)
+
+    // Generate constructor.
+    val ctorWriter = classWriter.visitMethod(

Review Comment:
   Do you want to cover the case where the default constructor is called? I had the code, just thought not that useful as 99% cases it failed to call other constructors/scan method before calling any constructor etc. Let me bring back the code...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278208609


##########
connector/connect/client/jvm/src/test/resources/StubClassDummyUdf.scala:
##########


Review Comment:
   Location is a bit weird, why not in src/test/scala?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278219437


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala:
##########
@@ -161,7 +162,19 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
    */
   def classloader: ClassLoader = {
     val urls = getSparkConnectAddedJars :+ classDir.toUri.toURL
-    new URLClassLoader(urls.toArray, Utils.getContextOrSparkClassLoader)
+    val loader = if (SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES).nonEmpty) {
+      val stubClassLoader =
+        StubClassLoader(null, SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES))
+      new ChildFirstURLClassLoader(

Review Comment:
   Should this follow the same rules for classpath resolution we have on the executor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278220230


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")

Review Comment:
   can you change this in a follow-up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278426094


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2547,4 +2547,18 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
+    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+      .internal()
+      .doc("""
+          |Comma-separated list of binary names of classes/packages that should be stubbed during
+          |the Scala UDF serde and execution if not found on the server classpath.
+          |An empty list effectively disables stubbing for all missing classes.
+          |By default, the server stubs classes from the Scala client package.
+          |""".stripMargin)

Review Comment:
   Include @hvanhovell as he suggested to not stubbing for user classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278214569


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -554,7 +554,7 @@ class SparkSession private[sql] (
     val command = proto.Command.newBuilder().setRegisterFunction(udf).build()
     val plan = proto.Plan.newBuilder().setCommand(command).build()
 
-    client.execute(plan)
+    client.execute(plan).asScala.foreach(_ => ())

Review Comment:
   Currently the registerUDF call is async. I do not feel it is correct to have registerUDF to be async, so added the code to block for success or error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278301449


##########
core/src/main/scala/org/apache/spark/util/StubClassLoader.scala:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.xbean.asm9.{ClassWriter, Opcodes}
+
+/**
+ * [[ClassLoader]] that replaces missing classes with stubs, if the cannot be found. It will only
+ * do this for classes that are marked for stubbing.
+ *
+ * While this is generally not a good idea. In this particular case this is used to load lambda's
+ * whose capturing class contains unknown (and unneeded) classes. The lambda itself does not need
+ * the class and therefor is safe to replace by a stub.
+ */
+class StubClassLoader(parent: ClassLoader, shouldStub: String => Boolean)
+  extends ClassLoader(parent) {
+  override def findClass(name: String): Class[_] = {
+    if (!shouldStub(name)) {
+      throw new ClassNotFoundException(name)
+    }
+    val bytes = StubClassLoader.generateStub(name)
+    defineClass(name, bytes, 0, bytes.length)
+  }
+}
+
+object StubClassLoader {
+  def apply(parent: ClassLoader, binaryName: Seq[String]): StubClassLoader = {
+    new StubClassLoader(parent, name => binaryName.exists(p => name.startsWith(p)))
+  }
+
+  def generateStub(binaryName: String): Array[Byte] = {
+    // Convert binary names to internal names.
+    val name = binaryName.replace('.', '/')
+    val classWriter = new ClassWriter(0)
+    classWriter.visit(
+      49,
+      Opcodes.ACC_PUBLIC + Opcodes.ACC_SUPER,
+      name,
+      null,
+      "java/lang/Object",
+      null)
+    classWriter.visitSource(name + ".java", null)
+
+    // Generate constructor.
+    val ctorWriter = classWriter.visitMethod(

Review Comment:
   In the description you write
   > If the user code is actually needed to execute the UDF, we will return an error message to suggest the user to add the missing classes using the addArtifact method.
   
   If I understand correctly, this stub should be throwing that error if it actually gets called?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278301449


##########
core/src/main/scala/org/apache/spark/util/StubClassLoader.scala:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.xbean.asm9.{ClassWriter, Opcodes}
+
+/**
+ * [[ClassLoader]] that replaces missing classes with stubs, if the cannot be found. It will only
+ * do this for classes that are marked for stubbing.
+ *
+ * While this is generally not a good idea. In this particular case this is used to load lambda's
+ * whose capturing class contains unknown (and unneeded) classes. The lambda itself does not need
+ * the class and therefor is safe to replace by a stub.
+ */
+class StubClassLoader(parent: ClassLoader, shouldStub: String => Boolean)
+  extends ClassLoader(parent) {
+  override def findClass(name: String): Class[_] = {
+    if (!shouldStub(name)) {
+      throw new ClassNotFoundException(name)
+    }
+    val bytes = StubClassLoader.generateStub(name)
+    defineClass(name, bytes, 0, bytes.length)
+  }
+}
+
+object StubClassLoader {
+  def apply(parent: ClassLoader, binaryName: Seq[String]): StubClassLoader = {
+    new StubClassLoader(parent, name => binaryName.exists(p => name.startsWith(p)))
+  }
+
+  def generateStub(binaryName: String): Array[Byte] = {
+    // Convert binary names to internal names.
+    val name = binaryName.replace('.', '/')
+    val classWriter = new ClassWriter(0)
+    classWriter.visit(
+      49,
+      Opcodes.ACC_PUBLIC + Opcodes.ACC_SUPER,
+      name,
+      null,
+      "java/lang/Object",
+      null)
+    classWriter.visitSource(name + ".java", null)
+
+    // Generate constructor.
+    val ctorWriter = classWriter.visitMethod(

Review Comment:
   In the description you write
   > If the user code is actually needed to execute the UDF, we will return an error message to suggest the user to add the missing classes using the addArtifact method.
   
   If I understand correctly, this stub should be throwing that error if it actually gets called?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278301748


##########
core/src/main/scala/org/apache/spark/util/StubClassLoader.scala:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.xbean.asm9.{ClassWriter, Opcodes}
+
+/**
+ * [[ClassLoader]] that replaces missing classes with stubs, if the cannot be found. It will only
+ * do this for classes that are marked for stubbing.
+ *
+ * While this is generally not a good idea. In this particular case this is used to load lambda's
+ * whose capturing class contains unknown (and unneeded) classes. The lambda itself does not need
+ * the class and therefor is safe to replace by a stub.
+ */
+class StubClassLoader(parent: ClassLoader, shouldStub: String => Boolean)
+  extends ClassLoader(parent) {
+  override def findClass(name: String): Class[_] = {
+    if (!shouldStub(name)) {
+      throw new ClassNotFoundException(name)
+    }
+    val bytes = StubClassLoader.generateStub(name)
+    defineClass(name, bytes, 0, bytes.length)
+  }
+}
+
+object StubClassLoader {
+  def apply(parent: ClassLoader, binaryName: Seq[String]): StubClassLoader = {
+    new StubClassLoader(parent, name => binaryName.exists(p => name.startsWith(p)))
+  }
+
+  def generateStub(binaryName: String): Array[Byte] = {

Review Comment:
   In the description you write
   
   > If the user code is actually needed to execute the UDF, we will return an error message to suggest the user to add the missing classes using the addArtifact method.
   
   If I understand correctly, this generated stub should be throwing that error if it actually gets called?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278404936


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1504,15 +1506,24 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
   }
 
   private def unpackUdf(fun: proto.CommonInlineUserDefinedFunction): UdfPacket = {
-    Utils.deserialize[UdfPacket](
-      fun.getScalarScalaUdf.getPayload.toByteArray,
-      Utils.getContextOrSparkClassLoader)
+    unpackScalarScalaUDF[UdfPacket](fun.getScalarScalaUdf)
   }
 
   private def unpackForeachWriter(fun: proto.ScalarScalaUDF): ForeachWriterPacket = {
-    Utils.deserialize[ForeachWriterPacket](
-      fun.getPayload.toByteArray,
-      Utils.getContextOrSparkClassLoader)
+    unpackScalarScalaUDF[ForeachWriterPacket](fun)
+  }
+
+  private def unpackScalarScalaUDF[T](fun: proto.ScalarScalaUDF): T = {
+    try {
+      logDebug(s"Unpack using class loader: ${Utils.getContextOrSparkClassLoader}")
+      Utils.deserialize[T](fun.getPayload.toByteArray, Utils.getContextOrSparkClassLoader)
+    } catch {
+      case e: IOException if e.getCause.isInstanceOf[NoSuchMethodException] =>
+        throw new ClassNotFoundException(
+          s"Failed to load class correctly due to ${e.getCause}. " +
+            "Make sure the artifact where the class is defined is installed by calling" +
+            " session.addArtifact.")

Review Comment:
   Ah, smart, that's why you catch `NoSuchMethodException`, because that would suggest actual use, and for  `NoSuchClassException` generate a stub, now I finally understand from your other comment with explanation.
   That could be worth a rubber ducky comment here saying that "while NoSuchClassException may be caused by an unused class accidentally pulled by the serializer, NoSuchMethodException suggests actual use of the class".
   
   And @hvanhovell comment about throwing from default constructor is to cover the case where someone just calls the default constructor, but doesn't use any methods?
   Also worth a rubber ducky comment :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42069:
URL: https://github.com/apache/spark/pull/42069#issuecomment-1655095285

   @zhenlineo Is there a chance to fix `SparkSessionE2ESuite` together in this pr? Maven test still failed (it also failed before this pr, but it seems that the server is also missing some Artifact?), thanks.
   
   ```
   SparkSessionE2ESuite:
   - interrupt all - background queries, foreground interrupt *** FAILED ***
     The code passed to eventually never returned normally. Attempted 30 times over 20.242366750000002 seconds. Last failure message: Some("unexpected failure in q2: org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult") was not empty Error not empty: Some(unexpected failure in q2: org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult). (SparkSessionE2ESuite.scala:69)
   - interrupt all - foreground queries, background interrupt *** FAILED ***
     "org/apache/spark/sql/connect/client/SparkResult" did not contain "OPERATION_CANCELED" Unexpected exception: org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult (SparkSessionE2ESuite.scala:99)
   - interrupt tag *** FAILED ***
     The code passed to eventually never returned normally. Attempted 30 times over 20.238849625 seconds. Last failure message: ListBuffer() had length 0 instead of expected length 2 Interrupted operations: ListBuffer().. (SparkSessionE2ESuite.scala:197)
   - interrupt operation *** FAILED ***
     org.apache.spark.SparkException: org/apache/spark/sql/connect/client/SparkResult
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.toSparkThrowable(GrpcExceptionConverter.scala:53)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$.convert(GrpcExceptionConverter.scala:30)
     at org.apache.spark.sql.connect.client.GrpcExceptionConverter$$anon$1.hasNext(GrpcExceptionConverter.scala:38)
     at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:83)
     at org.apache.spark.sql.connect.client.SparkResult.operationId(SparkResult.scala:174)
     at org.apache.spark.sql.SparkSessionE2ESuite.$anonfun$new$31(SparkSessionE2ESuite.scala:241)
     at org.apache.spark.sql.connect.client.util.RemoteSparkSession.$anonfun$test$1(RemoteSparkSession.scala:235)
     at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
     at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
     at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
     ...
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42069:
URL: https://github.com/apache/spark/pull/42069#discussion_r1278217655


##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -548,6 +554,9 @@ private[spark] class Executor(
           taskDescription.artifacts.jars,
           taskDescription.artifacts.archives,
           isolatedSession)
+        // Always reset the thread class loader to ensure if any updates, all threads (not only
+        // the thread that updated the dependencies) can update to the new class loader.
+        Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)

Review Comment:
   I am pretty sure we do this else where as well.
   
   Are you also unsetting it once you are done?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42069: [SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42069:
URL: https://github.com/apache/spark/pull/42069#issuecomment-1652129838

   > 2 new udf loading test failures: they needs the client jar file, so the test cannot run with mvn clean. I will update a warning to help with these two failures.
   
   Yes, we should make a clear indication on this, as I’ve noticed that many developers get into the habit of using the `build/mvn package test` command for testing, which ends up causing test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org