You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/12 15:03:17 UTC

[GitHub] [spark] gaborgsomogyi commented on a change in pull request #31818: [SPARK-34726][SQL][2.4] Fix collectToPython timeouts

gaborgsomogyi commented on a change in pull request #31818:
URL: https://github.com/apache/spark/pull/31818#discussion_r593238402



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
##########
@@ -1586,6 +1591,30 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-34726: Fix collectToPython timeouts") {
+    val listener = new QueryExecutionListener {
+      override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
+
+      override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
+        // Longer than 15s in `PythonServer.setupOneConnectionServer`
+        Thread.sleep(20 * 1000)
+      }
+    }
+    spark.listenerManager.register(listener)
+
+    val Array(port: Int, secretToPython: String) = spark.range(5).toDF().collectToPython()
+
+    // Mimic Python side
+    val socket = new Socket(InetAddress.getByAddress(Array(127, 0, 0, 1)), port)
+    val authHelper = new SocketAuthHelper(new SparkConf()) {
+      override val secret: String = secretToPython
+    }
+    authHelper.authToServer(socket)
+    Source.fromInputStream(socket.getInputStream)
+
+    spark.listenerManager.unregister(listener)

Review comment:
       I would put this into finally to be on the safe side.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
##########
@@ -1586,6 +1591,30 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-34726: Fix collectToPython timeouts") {
+    val listener = new QueryExecutionListener {
+      override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
+
+      override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
+        // Longer than 15s in `PythonServer.setupOneConnectionServer`
+        Thread.sleep(20 * 1000)

Review comment:
       Just for my own understanding does this mean the test waits 15 seconds to pass?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org