You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "LuciferYang (via GitHub)" <gi...@apache.org> on 2023/08/16 09:52:04 UTC

[GitHub] [spark] LuciferYang opened a new pull request, #42512: [SPARK-44824][CONNECT][TESTS][3.5] Reset `ammoniteOut` in the `afterEach` method of `ReplE2ESuite`

LuciferYang opened a new pull request, #42512:
URL: https://github.com/apache/spark/pull/42512

   ### What changes were proposed in this pull request?
   This PR add `ammoniteOut.reset()` in the `afterEach` method of `ReplE2ESuite` to ensure that the 'output' used for assertions in each test case is only related to the current case and not all content.
   
   ### Why are the changes needed?
   The current `ammoniteOut` records the output content of all executed tests, without isolating between cases. This can lead to unexpected assertion results. 
   For example, adding 'assertContains("""String = "[MyTestClass(1), MyTestClass(3)]"""", output)' in the following test case would still pass the test because it is a result content printed to `ammoniteOut` in the previous test case. 
   
   https://github.com/apache/spark/blob/2be20e54a2222f6cdf64e8486d1910133b43665f/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala#L283-L290
   
   Hence, we need to clear the content in `ammoniteOut` after each test to achieve isolation between test cases.
   
   ### Does this PR introduce _any_ user-facing change?
   No, just for test
   
   
   ### How was this patch tested?
   - Pass Github Actions
   - Manual check
   
   Prints the `output` after `val output = runCommandsInShell(input)` in the the case `streaming works with REPL generated code`
   
   https://github.com/apache/spark/blob/2be20e54a2222f6cdf64e8486d1910133b43665f/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala#L313-L318
   
   run 
   
   ```
   build/sbt "connect-client-jvm/testOnly org.apache.spark.sql.application.ReplE2ESuite" -Phive 
   ```
   
   **Before**: we can see the content of all test cases that have been executed in the `ReplE2ESuite`
   
   
   ```
   Spark session available as 'spark'.
      _____                  __      ______                            __
     / ___/____  ____ ______/ /__   / ____/___  ____  ____  ___  _____/ /_
     \__ \/ __ \/ __ `/ ___/ //_/  / /   / __ \/ __ \/ __ \/ _ \/ ___/ __/
    ___/ / /_/ / /_/ / /  / ,<    / /___/ /_/ / / / / / / /  __/ /__/ /_
   /____/ .___/\__,_/_/  /_/|_|   \____/\____/_/ /_/_/ /_/\___/\___/\__/
       /_/
   
   @  
   
   @ spark.sql("select 1").collect() 
   res0: Array[org.apache.spark.sql.Row] = Array([1])
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ class A(x: Int) { def get = x * 5 + 19 } 
   defined class A
   
   @ def dummyUdf(x: Int): Int = new A(x).get 
   defined function dummyUdf
   
   @ val myUdf = udf(dummyUdf _) 
   myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ spark.range(5).select(myUdf(col("id"))).as[Int].collect() 
   res5: Array[Int] = Array(19, 24, 29, 34, 39)
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ class A(x: Int) { def get = x * 42 + 5 } 
   defined class A
   
   @ val myUdf = udf((x: Int) => new A(x).get) 
   myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ spark.range(5).select(myUdf(col("id"))).as[Int].collect() 
   res9: Array[Int] = Array(5, 47, 89, 131, 173)
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ class A(x: Int) { def get = x * 7 } 
   defined class A
   
   @ val myUdf = udf((x: Int) => new A(x).get) 
   myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ val modifiedUdf = myUdf.withName("myUdf").asNondeterministic() 
   modifiedUdf: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ spark.range(5).select(modifiedUdf(col("id"))).as[Int].collect() 
   res14: Array[Int] = Array(0, 7, 14, 21, 28)
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ spark.range(10).filter(n => n % 2 == 0).collect() 
   res16: Array[java.lang.Long] = Array(0L, 2L, 4L, 6L, 8L)
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ import java.nio.file.Paths 
   import java.nio.file.Paths
   
   @ def classLoadingTest(x: Int): Int = {
       val classloader =
         Option(Thread.currentThread().getContextClassLoader).getOrElse(getClass.getClassLoader)
       val cls = Class.forName("com.example.Hello$", true, classloader)
       val module = cls.getField("MODULE$").get(null)
       cls.getMethod("test").invoke(module).asInstanceOf[Int]
     } 
   defined function classLoadingTest
   
   @ val classLoaderUdf = udf(classLoadingTest _) 
   classLoaderUdf: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @  
   
   @ val jarPath = Paths.get("/Users/yangjie01/SourceCode/git/spark-mine-sbt/connector/connect/client/jvm/src/test/resources/TestHelloV2_2.12.jar").toUri 
   jarPath: java.net.URI = file:///Users/yangjie01/SourceCode/git/spark-mine-sbt/connector/connect/client/jvm/src/test/resources/TestHelloV2_2.12.jar
   
   @ spark.addArtifact(jarPath) 
   
   
   @  
   
   @ spark.range(5).select(classLoaderUdf(col("id"))).as[Int].collect() 
   res23: Array[Int] = Array(2, 2, 2, 2, 2)
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ import org.apache.spark.sql.api.java._ 
   import org.apache.spark.sql.api.java._
   
   @ import org.apache.spark.sql.types.LongType 
   import org.apache.spark.sql.types.LongType
   
   @  
   
   @ val javaUdf = udf(new UDF1[Long, Long] {
       override def call(num: Long): Long = num * num + 25L
     }, LongType).asNondeterministic() 
   javaUdf: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ spark.range(5).select(javaUdf(col("id"))).as[Long].collect() 
   res28: Array[Long] = Array(25L, 26L, 29L, 34L, 41L)
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ import org.apache.spark.sql.api.java._ 
   import org.apache.spark.sql.api.java._
   
   @ import org.apache.spark.sql.types.LongType 
   import org.apache.spark.sql.types.LongType
   
   @  
   
   @ spark.udf.register("javaUdf", new UDF1[Long, Long] {
       override def call(num: Long): Long = num * num * num + 250L
     }, LongType) 
   
   
   @ spark.sql("select javaUdf(id) from range(5)").as[Long].collect() 
   res33: Array[Long] = Array(250L, 251L, 258L, 277L, 314L)
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ class A(x: Int) { def get = x * 100 } 
   defined class A
   
   @ val myUdf = udf((x: Int) => new A(x).get) 
   myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ spark.udf.register("dummyUdf", myUdf) 
   res37: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ spark.sql("select dummyUdf(id) from range(5)").as[Long].collect() 
   res38: Array[Long] = Array(0L, 100L, 200L, 300L, 400L)
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ class A(x: Int) { def get = x * 15 } 
   defined class A
   
   @ spark.udf.register("directUdf", (x: Int) => new A(x).get) 
   res41: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ spark.sql("select directUdf(id) from range(5)").as[Long].collect() 
   res42: Array[Long] = Array(0L, 15L, 30L, 45L, 60L)
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") 
   df: org.apache.spark.sql.package.DataFrame = [id: string, value: int]
   
   @ spark.udf.register("simpleUDF", (v: Int) => v * v) 
   res45: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ df.select($"id", call_udf("simpleUDF", $"value")).collect() 
   res46: Array[org.apache.spark.sql.Row] = Array([id1,1], [id2,16], [id3,25])
   
   @        
   
   semaphore.release() 
   
   
   @  
   
   @ val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") 
   df: org.apache.spark.sql.package.DataFrame = [id: string, value: int]
   
   @ spark.udf.register("simpleUDF", (v: Int) => v * v) 
   res49: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ df.select($"id", call_function("simpleUDF", $"value")).collect() 
   res50: Array[org.apache.spark.sql.Row] = Array([id1,1], [id2,16], [id3,25])
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ case class MyTestClass(value: Int) 
   defined class MyTestClass
   
   @ spark.range(4).
       filter($"id" % 2 === 1).
       select($"id".cast("int").as("value")).
       as[MyTestClass].
       collect().
       map(mtc => s"MyTestClass(${mtc.value})").
       mkString("[", ", ", "]") 
   res53: String = "[MyTestClass(1), MyTestClass(3)]"
   
   @            
   
   @ semaphore.release() 
   
   
   @  
   
   @ case class MyTestClass(value: Int) 
   defined class MyTestClass
   
   @ spark.range(2).map(i => MyTestClass(i.toInt)).collect() 
   res56: Array[MyTestClass] = Array(MyTestClass(0), MyTestClass(1))
   
   @        
   
   @ semaphore.release() 
   
   
   @  
   
   @ val add1 = udf((i: Long) => i + 1) 
   add1: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ val query = {
       spark.readStream
           .format("rate")
           .option("rowsPerSecond", "10")
           .option("numPartitions", "1")
           .load()
           .withColumn("value", add1($"value"))
           .writeStream
           .format("memory")
           .queryName("my_sink")
           .start()
     } 
   query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.streaming.RemoteStreamingQuery@79cdf37e
   
   @ var progress = query.lastProgress 
   progress: org.apache.spark.sql.streaming.StreamingQueryProgress = null
   
   @ while (query.isActive && (progress == null || progress.numInputRows == 0)) {
       query.awaitTermination(100)
       progress = query.lastProgress
     } 
   
   
   @ val noException = query.exception.isEmpty 
   noException: Boolean = true
   
   @ query.stop() 
   
   
   @  
   
   @ semaphore.release() 
   ```
   
   **After**: we can only see the content that is related to the test case `streaming works with REPL generated code`
   
   ```
   @  
   
   @ val add1 = udf((i: Long) => i + 1) 
   add1: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction(
     Array(
   ...
   
   @ val query = {
       spark.readStream
           .format("rate")
           .option("rowsPerSecond", "10")
           .option("numPartitions", "1")
           .load()
           .withColumn("value", add1($"value"))
           .writeStream
           .format("memory")
           .queryName("my_sink")
           .start()
     } 
   query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.streaming.RemoteStreamingQuery@5429e19b
   
   @ var progress = query.lastProgress 
   progress: org.apache.spark.sql.streaming.StreamingQueryProgress = null
   
   @ while (query.isActive && (progress == null || progress.numInputRows == 0)) {
       query.awaitTermination(100)
       progress = query.lastProgress
     } 
   
   
   @ val noException = query.exception.isEmpty 
   noException: Boolean = true
   
   @ query.stop() 
   
   
   @  
   
   @ semaphore.release() 
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #42512: [SPARK-44824][CONNECT][TESTS][3.5] Reset `ammoniteOut` in the `afterEach` method of `ReplE2ESuite`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #42512:
URL: https://github.com/apache/spark/pull/42512#issuecomment-1681129385

   @LuciferYang no worries, you can manually cherry-pick it next time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42512: [SPARK-44824][CONNECT][TESTS][3.5] Reset `ammoniteOut` in the `afterEach` method of `ReplE2ESuite`

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42512:
URL: https://github.com/apache/spark/pull/42512#issuecomment-1680310294

   @hvanhovell @dongjoon-hyun I’m very sorry for the oversight, https://github.com/apache/spark/pull/42509 was not merged into branch-3.5 due to my operational error. I have submitted a separate PR, could you please review it when you have time? Thanks ~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #42512: [SPARK-44824][CONNECT][TESTS][3.5] Reset `ammoniteOut` in the `afterEach` method of `ReplE2ESuite`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #42512: [SPARK-44824][CONNECT][TESTS][3.5] Reset `ammoniteOut` in the `afterEach` method of `ReplE2ESuite`
URL: https://github.com/apache/spark/pull/42512


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #42512: [SPARK-44824][CONNECT][TESTS][3.5] Reset `ammoniteOut` in the `afterEach` method of `ReplE2ESuite`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #42512:
URL: https://github.com/apache/spark/pull/42512#issuecomment-1681129504

   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org