You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/24 05:22:56 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request, #38785: [SPARK-41249][SS] Add acceptance test for self-union on streaming query

HeartSaVioR opened a new pull request, #38785:
URL: https://github.com/apache/spark/pull/38785

   ### What changes were proposed in this pull request?
   
   This PR proposes to add a new test suite specifically for self-union tests on streaming query. The test cases are acceptance tests for 4 different cases, DSv1 vs DSv2 / DataStreamReader API vs table API.
   
   ### Why are the changes needed?
   
   This PR brings more test coverage on streaming workloads.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   New test suite.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38785: [SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming query

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38785:
URL: https://github.com/apache/spark/pull/38785#issuecomment-1326320125

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #38785: [SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming query

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #38785: [SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming query
URL: https://github.com/apache/spark/pull/38785


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38785: [SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming query

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38785:
URL: https://github.com/apache/spark/pull/38785#discussion_r1031295704


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, InMemoryStreamTableCatalog}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  before {
+    spark.conf.set("spark.sql.catalog.teststream", classOf[InMemoryStreamTableCatalog].getName)
+  }
+
+  after {
+    spark.sessionState.catalogManager.reset()
+    spark.sessionState.conf.clear()
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("self-union, DSv1, read via DataStreamReader API") {
+    withTempPath { dir =>
+      val dataLocation = dir.getAbsolutePath
+      spark.range(1, 4).write.format("parquet").save(dataLocation)
+
+      val streamDf = spark.readStream.format("parquet")
+        .schema(StructType(Seq(StructField("id", LongType)))).load(dataLocation)
+      val unionedDf = streamDf.union(streamDf)
+
+      testStream(unionedDf)(
+        ProcessAllAvailable(),
+        CheckLastBatch(1, 2, 3, 1, 2, 3),
+        AssertOnQuery { q =>
+          val lastProgress = getLastProgressWithData(q)
+          assert(lastProgress.nonEmpty)
+          assert(lastProgress.get.numInputRows == 6)
+          assert(lastProgress.get.sources.length == 1)
+          assert(lastProgress.get.sources(0).numInputRows == 6)
+          true
+        }
+      )
+    }
+  }
+
+  test("self-union, DSv1, read via table API") {
+    withTable("parquet_streaming_tbl") {
+      spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING parquet")
+
+      val streamDf = spark.readStream.table("parquet_streaming_tbl")
+      val unionedDf = streamDf.union(streamDf)
+
+      val clock = new StreamManualClock()
+      testStream(unionedDf)(
+        StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)),
+        Execute { _ =>
+          spark.range(1, 4).selectExpr("id AS key")
+            .write.format("parquet").mode(SaveMode.Append).saveAsTable("parquet_streaming_tbl")
+        },
+        AdvanceManualClock(150),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch(1, 2, 3, 1, 2, 3),
+        AssertOnQuery { q =>
+          val lastProgress = getLastProgressWithData(q)
+          assert(lastProgress.nonEmpty)
+          assert(lastProgress.get.numInputRows == 6)
+          assert(lastProgress.get.sources.length == 1)
+          assert(lastProgress.get.sources(0).numInputRows == 6)
+          true
+        }
+      )
+    }
+  }
+
+  test("self-union, DSv2, read via DataStreamReader API") {
+    val inputData = MemoryStream[Int]
+
+    val streamDf = inputData.toDF()
+    val unionedDf = streamDf.union(streamDf)
+
+    testStream(unionedDf)(
+      AddData(inputData, 1, 2, 3),
+      CheckLastBatch(1, 2, 3, 1, 2, 3),
+      AssertOnQuery { q =>
+        val lastProgress = getLastProgressWithData(q)
+        assert(lastProgress.nonEmpty)
+        assert(lastProgress.get.numInputRows == 6)
+        assert(lastProgress.get.sources.length == 1)
+        assert(lastProgress.get.sources(0).numInputRows == 6)
+        true
+      }
+    )
+  }
+
+  test("self-union, DSv2, read via table API") {
+    val tblName = "teststream.table_name"
+    withTable(tblName) {
+      spark.sql(s"CREATE TABLE $tblName (data int) USING foo")
+      val stream = MemoryStream[Int]
+      val testCatalog = spark.sessionState.catalogManager.catalog("teststream").asTableCatalog
+      val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+      table.asInstanceOf[InMemoryStreamTable].setStream(stream)
+
+      val streamDf = spark.readStream.table(tblName)
+      val unionedDf = streamDf.union(streamDf)
+
+      testStream(unionedDf) (
+        AddData(stream, 1, 2, 3),

Review Comment:
   I think so - we still read from table via table API here. (We use `readStream.table`.) It's just that the table is backed by MemoryStream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38785: [SPARK-41249][SS] Add acceptance test for self-union on streaming query

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38785:
URL: https://github.com/apache/spark/pull/38785#issuecomment-1325972411

   cc. @zsxwing @viirya Please take a look. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #38785: [SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming query

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #38785:
URL: https://github.com/apache/spark/pull/38785#discussion_r1031284628


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, InMemoryStreamTableCatalog}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  before {
+    spark.conf.set("spark.sql.catalog.teststream", classOf[InMemoryStreamTableCatalog].getName)
+  }
+
+  after {
+    spark.sessionState.catalogManager.reset()
+    spark.sessionState.conf.clear()
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("self-union, DSv1, read via DataStreamReader API") {
+    withTempPath { dir =>
+      val dataLocation = dir.getAbsolutePath
+      spark.range(1, 4).write.format("parquet").save(dataLocation)
+
+      val streamDf = spark.readStream.format("parquet")
+        .schema(StructType(Seq(StructField("id", LongType)))).load(dataLocation)
+      val unionedDf = streamDf.union(streamDf)
+
+      testStream(unionedDf)(
+        ProcessAllAvailable(),
+        CheckLastBatch(1, 2, 3, 1, 2, 3),
+        AssertOnQuery { q =>
+          val lastProgress = getLastProgressWithData(q)
+          assert(lastProgress.nonEmpty)
+          assert(lastProgress.get.numInputRows == 6)
+          assert(lastProgress.get.sources.length == 1)
+          assert(lastProgress.get.sources(0).numInputRows == 6)
+          true
+        }
+      )
+    }
+  }
+
+  test("self-union, DSv1, read via table API") {
+    withTable("parquet_streaming_tbl") {
+      spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING parquet")
+
+      val streamDf = spark.readStream.table("parquet_streaming_tbl")
+      val unionedDf = streamDf.union(streamDf)
+
+      val clock = new StreamManualClock()
+      testStream(unionedDf)(
+        StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)),
+        Execute { _ =>
+          spark.range(1, 4).selectExpr("id AS key")
+            .write.format("parquet").mode(SaveMode.Append).saveAsTable("parquet_streaming_tbl")
+        },
+        AdvanceManualClock(150),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch(1, 2, 3, 1, 2, 3),
+        AssertOnQuery { q =>
+          val lastProgress = getLastProgressWithData(q)
+          assert(lastProgress.nonEmpty)
+          assert(lastProgress.get.numInputRows == 6)
+          assert(lastProgress.get.sources.length == 1)
+          assert(lastProgress.get.sources(0).numInputRows == 6)
+          true
+        }
+      )
+    }
+  }
+
+  test("self-union, DSv2, read via DataStreamReader API") {
+    val inputData = MemoryStream[Int]
+
+    val streamDf = inputData.toDF()
+    val unionedDf = streamDf.union(streamDf)
+
+    testStream(unionedDf)(
+      AddData(inputData, 1, 2, 3),
+      CheckLastBatch(1, 2, 3, 1, 2, 3),
+      AssertOnQuery { q =>
+        val lastProgress = getLastProgressWithData(q)
+        assert(lastProgress.nonEmpty)
+        assert(lastProgress.get.numInputRows == 6)
+        assert(lastProgress.get.sources.length == 1)
+        assert(lastProgress.get.sources(0).numInputRows == 6)
+        true
+      }
+    )
+  }
+
+  test("self-union, DSv2, read via table API") {
+    val tblName = "teststream.table_name"
+    withTable(tblName) {
+      spark.sql(s"CREATE TABLE $tblName (data int) USING foo")
+      val stream = MemoryStream[Int]
+      val testCatalog = spark.sessionState.catalogManager.catalog("teststream").asTableCatalog
+      val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+      table.asInstanceOf[InMemoryStreamTable].setStream(stream)
+
+      val streamDf = spark.readStream.table(tblName)
+      val unionedDf = streamDf.union(streamDf)
+
+      testStream(unionedDf) (
+        AddData(stream, 1, 2, 3),

Review Comment:
   If the data is written into the table location, instead of through the stream (i.e. like `self-union, DSv1, read via table API`), is it also working?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38785: [SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming query

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38785:
URL: https://github.com/apache/spark/pull/38785#discussion_r1031295704


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, InMemoryStreamTableCatalog}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  before {
+    spark.conf.set("spark.sql.catalog.teststream", classOf[InMemoryStreamTableCatalog].getName)
+  }
+
+  after {
+    spark.sessionState.catalogManager.reset()
+    spark.sessionState.conf.clear()
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("self-union, DSv1, read via DataStreamReader API") {
+    withTempPath { dir =>
+      val dataLocation = dir.getAbsolutePath
+      spark.range(1, 4).write.format("parquet").save(dataLocation)
+
+      val streamDf = spark.readStream.format("parquet")
+        .schema(StructType(Seq(StructField("id", LongType)))).load(dataLocation)
+      val unionedDf = streamDf.union(streamDf)
+
+      testStream(unionedDf)(
+        ProcessAllAvailable(),
+        CheckLastBatch(1, 2, 3, 1, 2, 3),
+        AssertOnQuery { q =>
+          val lastProgress = getLastProgressWithData(q)
+          assert(lastProgress.nonEmpty)
+          assert(lastProgress.get.numInputRows == 6)
+          assert(lastProgress.get.sources.length == 1)
+          assert(lastProgress.get.sources(0).numInputRows == 6)
+          true
+        }
+      )
+    }
+  }
+
+  test("self-union, DSv1, read via table API") {
+    withTable("parquet_streaming_tbl") {
+      spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING parquet")
+
+      val streamDf = spark.readStream.table("parquet_streaming_tbl")
+      val unionedDf = streamDf.union(streamDf)
+
+      val clock = new StreamManualClock()
+      testStream(unionedDf)(
+        StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)),
+        Execute { _ =>
+          spark.range(1, 4).selectExpr("id AS key")
+            .write.format("parquet").mode(SaveMode.Append).saveAsTable("parquet_streaming_tbl")
+        },
+        AdvanceManualClock(150),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch(1, 2, 3, 1, 2, 3),
+        AssertOnQuery { q =>
+          val lastProgress = getLastProgressWithData(q)
+          assert(lastProgress.nonEmpty)
+          assert(lastProgress.get.numInputRows == 6)
+          assert(lastProgress.get.sources.length == 1)
+          assert(lastProgress.get.sources(0).numInputRows == 6)
+          true
+        }
+      )
+    }
+  }
+
+  test("self-union, DSv2, read via DataStreamReader API") {
+    val inputData = MemoryStream[Int]
+
+    val streamDf = inputData.toDF()
+    val unionedDf = streamDf.union(streamDf)
+
+    testStream(unionedDf)(
+      AddData(inputData, 1, 2, 3),
+      CheckLastBatch(1, 2, 3, 1, 2, 3),
+      AssertOnQuery { q =>
+        val lastProgress = getLastProgressWithData(q)
+        assert(lastProgress.nonEmpty)
+        assert(lastProgress.get.numInputRows == 6)
+        assert(lastProgress.get.sources.length == 1)
+        assert(lastProgress.get.sources(0).numInputRows == 6)
+        true
+      }
+    )
+  }
+
+  test("self-union, DSv2, read via table API") {
+    val tblName = "teststream.table_name"
+    withTable(tblName) {
+      spark.sql(s"CREATE TABLE $tblName (data int) USING foo")
+      val stream = MemoryStream[Int]
+      val testCatalog = spark.sessionState.catalogManager.catalog("teststream").asTableCatalog
+      val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+      table.asInstanceOf[InMemoryStreamTable].setStream(stream)
+
+      val streamDf = spark.readStream.table(tblName)
+      val unionedDf = streamDf.union(streamDf)
+
+      testStream(unionedDf) (
+        AddData(stream, 1, 2, 3),

Review Comment:
   I think so - here we also read from table via table API. (We use `readStream.table`.) It's just that the table is backed by MemoryStream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org