You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/18 23:31:50 UTC

[spark] branch master updated: [SPARK-45133][CONNECT][TESTS][FOLLOWUP] Add test that queries transition to FINISHED

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8323c0c48de [SPARK-45133][CONNECT][TESTS][FOLLOWUP] Add test that queries transition to FINISHED
8323c0c48de is described below

commit 8323c0c48de7f498ef2452059f737a167586b98d
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Tue Sep 19 08:31:15 2023 +0900

    [SPARK-45133][CONNECT][TESTS][FOLLOWUP] Add test that queries transition to FINISHED
    
    ### What changes were proposed in this pull request?
    
    Add test checking that queries (also special case: local relations) transition to FINISHED state, even if the client does not consume the results.
    
    ### Why are the changes needed?
    
    Add test for SPARK-45133.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This adds tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42910 from juliuszsompolski/SPARK-45133-followup.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../spark/sql/connect/SparkConnectServerTest.scala | 29 ++++++++++++-
 .../service/SparkConnectServiceE2ESuite.scala      | 48 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 1 deletion(-)

diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
index eddd1c6be72..7b02377f484 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
@@ -16,14 +16,19 @@
  */
 package org.apache.spark.sql.connect
 
-import java.util.UUID
+import java.util.{TimeZone, UUID}
 
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.arrow.memory.RootAllocator
 import org.scalatest.concurrent.{Eventually, TimeLimits}
 import org.scalatest.time.Span
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.connect.client.{CloseableIterator, CustomSparkConnectBlockingStub, ExecutePlanResponseReattachableIterator, GrpcRetryHandler, SparkConnectClient, WrappedCloseableIterator}
+import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
 import org.apache.spark.sql.connect.common.config.ConnectCommon
 import org.apache.spark.sql.connect.config.Connect
 import org.apache.spark.sql.connect.dsl.MockRemoteSession
@@ -43,6 +48,8 @@ trait SparkConnectServerTest extends SharedSparkSession {
 
   val eventuallyTimeout = 30.seconds
 
+  val allocator = new RootAllocator()
+
   override def beforeAll(): Unit = {
     super.beforeAll()
     // Other suites using mocks leave a mess in the global executionManager,
@@ -60,6 +67,7 @@ trait SparkConnectServerTest extends SharedSparkSession {
 
   override def afterAll(): Unit = {
     SparkConnectService.stop()
+    allocator.close()
     super.afterAll()
   }
 
@@ -127,6 +135,19 @@ trait SparkConnectServerTest extends SharedSparkSession {
     proto.Plan.newBuilder().setRoot(dsl.sql(query)).build()
   }
 
+  protected def buildLocalRelation[A <: Product: TypeTag](data: Seq[A]) = {
+    val encoder = ScalaReflection.encoderFor[A]
+    val arrowData =
+      ArrowSerializer.serialize(data.iterator, encoder, allocator, TimeZone.getDefault.getID)
+    val localRelation = proto.LocalRelation
+      .newBuilder()
+      .setData(arrowData)
+      .setSchema(encoder.schema.json)
+      .build()
+    val relation = proto.Relation.newBuilder().setLocalRelation(localRelation).build()
+    proto.Plan.newBuilder().setRoot(relation).build()
+  }
+
   protected def getReattachableIterator(
       stubIterator: CloseableIterator[proto.ExecutePlanResponse]) = {
     // This depends on the wrapping in CustomSparkConnectBlockingStub.executePlanReattachable:
@@ -188,6 +209,12 @@ trait SparkConnectServerTest extends SharedSparkSession {
     executions.head
   }
 
+  protected def eventuallyGetExecutionHolder: ExecuteHolder = {
+    Eventually.eventually(timeout(eventuallyTimeout)) {
+      getExecutionHolder
+    }
+  }
+
   protected def withClient(f: SparkConnectClient => Unit): Unit = {
     val client = SparkConnectClient
       .builder()
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
new file mode 100644
index 00000000000..14ecc9a2e95
--- /dev/null
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.connect.SparkConnectServerTest
+
+class SparkConnectServiceE2ESuite extends SparkConnectServerTest {
+
+  test("SPARK-45133 query should reach FINISHED state when results are not consumed") {
+    withRawBlockingStub { stub =>
+      val iter =
+        stub.executePlan(buildExecutePlanRequest(buildPlan("select * from range(1000000)")))
+      iter.hasNext
+      val execution = eventuallyGetExecutionHolder
+      Eventually.eventually(timeout(30.seconds)) {
+        execution.eventsManager.status == ExecuteStatus.Finished
+      }
+    }
+  }
+
+  test("SPARK-45133 local relation should reach FINISHED state when results are not consumed") {
+    withClient { client =>
+      val iter = client.execute(buildLocalRelation((1 to 1000000).map(i => (i, i + 1))))
+      iter.hasNext
+      val execution = eventuallyGetExecutionHolder
+      Eventually.eventually(timeout(30.seconds)) {
+        execution.eventsManager.status == ExecuteStatus.Finished
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org