You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/18 23:31:50 UTC
[spark] branch master updated: [SPARK-45133][CONNECT][TESTS][FOLLOWUP] Add test that queries transition to FINISHED
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8323c0c48de [SPARK-45133][CONNECT][TESTS][FOLLOWUP] Add test that queries transition to FINISHED
8323c0c48de is described below
commit 8323c0c48de7f498ef2452059f737a167586b98d
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Tue Sep 19 08:31:15 2023 +0900
[SPARK-45133][CONNECT][TESTS][FOLLOWUP] Add test that queries transition to FINISHED
### What changes were proposed in this pull request?
Add test checking that queries (also special case: local relations) transition to FINISHED state, even if the client does not consume the results.
### Why are the changes needed?
Add test for SPARK-45133.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This adds tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42910 from juliuszsompolski/SPARK-45133-followup.
Authored-by: Juliusz Sompolski <ju...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../spark/sql/connect/SparkConnectServerTest.scala | 29 ++++++++++++-
.../service/SparkConnectServiceE2ESuite.scala | 48 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 1 deletion(-)
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
index eddd1c6be72..7b02377f484 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
@@ -16,14 +16,19 @@
*/
package org.apache.spark.sql.connect
-import java.util.UUID
+import java.util.{TimeZone, UUID}
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.arrow.memory.RootAllocator
import org.scalatest.concurrent.{Eventually, TimeLimits}
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.connect.client.{CloseableIterator, CustomSparkConnectBlockingStub, ExecutePlanResponseReattachableIterator, GrpcRetryHandler, SparkConnectClient, WrappedCloseableIterator}
+import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
import org.apache.spark.sql.connect.common.config.ConnectCommon
import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.sql.connect.dsl.MockRemoteSession
@@ -43,6 +48,8 @@ trait SparkConnectServerTest extends SharedSparkSession {
val eventuallyTimeout = 30.seconds
+ val allocator = new RootAllocator()
+
override def beforeAll(): Unit = {
super.beforeAll()
// Other suites using mocks leave a mess in the global executionManager,
@@ -60,6 +67,7 @@ trait SparkConnectServerTest extends SharedSparkSession {
override def afterAll(): Unit = {
SparkConnectService.stop()
+ allocator.close()
super.afterAll()
}
@@ -127,6 +135,19 @@ trait SparkConnectServerTest extends SharedSparkSession {
proto.Plan.newBuilder().setRoot(dsl.sql(query)).build()
}
+ protected def buildLocalRelation[A <: Product: TypeTag](data: Seq[A]) = {
+ val encoder = ScalaReflection.encoderFor[A]
+ val arrowData =
+ ArrowSerializer.serialize(data.iterator, encoder, allocator, TimeZone.getDefault.getID)
+ val localRelation = proto.LocalRelation
+ .newBuilder()
+ .setData(arrowData)
+ .setSchema(encoder.schema.json)
+ .build()
+ val relation = proto.Relation.newBuilder().setLocalRelation(localRelation).build()
+ proto.Plan.newBuilder().setRoot(relation).build()
+ }
+
protected def getReattachableIterator(
stubIterator: CloseableIterator[proto.ExecutePlanResponse]) = {
// This depends on the wrapping in CustomSparkConnectBlockingStub.executePlanReattachable:
@@ -188,6 +209,12 @@ trait SparkConnectServerTest extends SharedSparkSession {
executions.head
}
+ protected def eventuallyGetExecutionHolder: ExecuteHolder = {
+ Eventually.eventually(timeout(eventuallyTimeout)) {
+ getExecutionHolder
+ }
+ }
+
protected def withClient(f: SparkConnectClient => Unit): Unit = {
val client = SparkConnectClient
.builder()
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
new file mode 100644
index 00000000000..14ecc9a2e95
--- /dev/null
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.connect.SparkConnectServerTest
+
+class SparkConnectServiceE2ESuite extends SparkConnectServerTest {
+
+ test("SPARK-45133 query should reach FINISHED state when results are not consumed") {
+ withRawBlockingStub { stub =>
+ val iter =
+ stub.executePlan(buildExecutePlanRequest(buildPlan("select * from range(1000000)")))
+ iter.hasNext
+ val execution = eventuallyGetExecutionHolder
+ Eventually.eventually(timeout(30.seconds)) {
+ execution.eventsManager.status == ExecuteStatus.Finished
+ }
+ }
+ }
+
+ test("SPARK-45133 local relation should reach FINISHED state when results are not consumed") {
+ withClient { client =>
+ val iter = client.execute(buildLocalRelation((1 to 1000000).map(i => (i, i + 1))))
+ iter.hasNext
+ val execution = eventuallyGetExecutionHolder
+ Eventually.eventually(timeout(30.seconds)) {
+ execution.eventsManager.status == ExecuteStatus.Finished
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org