You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/02 02:32:46 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request, #37368: [SPARK-39940][SS] Refresh catalog table on streaming query with DSv1 sink

HeartSaVioR opened a new pull request, #37368:
URL: https://github.com/apache/spark/pull/37368

   Credit to @pranavanand on figuring out the issue and providing the broken test code!
   
   ### What changes were proposed in this pull request?
   
   This PR proposes to refresh the destination catalog table when streaming query is writing to the catalog table via DSv1 sink.
   
   ### Why are the changes needed?
   
   It has been long standing issue that streaming query is not aware of catalog table (not the table brought by DSv2), hence no way for streaming query to refresh the catalog table after updating it. As a side effect of SPARK-39564, Spark brings up the information of destination catalog table into MicrobatchExecution, which enables to refresh table.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   New UT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #37368: [SPARK-39940][SS] Refresh catalog table on streaming query with DSv1 sink

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #37368:
URL: https://github.com/apache/spark/pull/37368#discussion_r935082833


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala:
##########
@@ -445,6 +445,44 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
     }
   }
 
+  test("SPARK-39940: refresh table when streaming query writes to the catalog table via DSv1") {
+    withTable("tbl1", "tbl2") {
+      withTempDir { dir =>
+        val baseTbls = new File(dir, "tables")
+        val tbl1File = new File(baseTbls, "tbl1")
+        val tbl2File = new File(baseTbls, "tbl2")
+        val checkpointLocation = new File(dir, "checkpoint")
+
+        val format = "parquet"
+        Seq((1, 2)).toDF("i", "d")
+          .write.format(format).option("path", tbl1File.getCanonicalPath).saveAsTable("tbl1")
+
+        val query = spark.readStream.format(format).table("tbl1")
+          .writeStream.format(format)
+          .option("checkpointLocation", checkpointLocation.getCanonicalPath)
+          .option("path", tbl2File.getCanonicalPath)
+          .toTable("tbl2")
+
+        try {
+          query.processAllAvailable()
+          checkAnswer(spark.table("tbl2").sort($"i"), Seq(Row(1, 2)))
+
+          Seq((3, 4)).toDF("i", "d")
+            .write.format(format).option("path", tbl1File.getCanonicalPath)
+            .mode(SaveMode.Append).saveAsTable("tbl1")
+
+          query.processAllAvailable()
+          checkAnswer(spark.table("tbl2").sort($"i"), Seq(Row(1, 2), Row(3, 4)))
+
+          assert(query.exception.isEmpty, s"No exception should happen in streaming query: " +

Review Comment:
   ```suggestion
             assert(query.exception.isEmpty, "No exception should happen in streaming query: " +
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #37368: [SPARK-39940][SS] Refresh catalog table on streaming query with DSv1 sink

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #37368: [SPARK-39940][SS] Refresh catalog table on streaming query with DSv1 sink
URL: https://github.com/apache/spark/pull/37368


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37368: [SPARK-39940][SS] Refresh catalog table on streaming query with DSv1 sink

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #37368:
URL: https://github.com/apache/spark/pull/37368#discussion_r935061578


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -680,7 +680,14 @@ class MicroBatchExecution(
     val batchSinkProgress: Option[StreamWriterCommitProgress] = reportTimeTaken("addBatch") {
       SQLExecution.withNewExecutionId(lastExecution) {
         sink match {
-          case s: Sink => s.addBatch(currentBatchId, nextBatch)
+          case s: Sink =>
+            s.addBatch(currentBatchId, nextBatch)
+            // DSv2 write node has a mechanism to invalidate DSv2 relation, but there is no
+            // corresponding one for DSv1. Given we have an information of catalog table for sink,
+            // we can refresh the catalog table once the write has succeeded.
+            plan.catalogTable.foreach { tbl =>

Review Comment:
   Here I only deal with DSv1 path since DSv2 streaming writer node (`WriteToDataSourceV2Exec`) contains the mechanism of refreshing cache. I think this would cover the case of DSv2 table and I can't think of coordination between DSv2 sink and catalog table, but please correct me if I'm missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37368: [SPARK-39940][SS] Refresh catalog table on streaming query with DSv1 sink

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37368:
URL: https://github.com/apache/spark/pull/37368#issuecomment-1201948170

   cc. @cloud-fan @viirya @zsxwing @xuanyuanking Appreciate your review. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37368: [SPARK-39940][SS] Refresh catalog table on streaming query with DSv1 sink

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37368:
URL: https://github.com/apache/spark/pull/37368#issuecomment-1202207389

   This only fails on sparkr: https://github.com/HeartSaVioR/spark/runs/7624154326
   
   And the error message does not seem to relate to the actual test failure.
   
   ```
   2022-08-02T07:56:10.2567549Z Status: 1 WARNING, 2 NOTEs
   2022-08-02T07:56:10.2567628Z See
   2022-08-02T07:56:10.2567843Z   ‘/__w/spark/spark/R/SparkR.Rcheck/00check.log’
   2022-08-02T07:56:10.2567935Z for details.
   2022-08-02T07:56:10.2567945Z 
   2022-08-02T07:56:10.2568078Z 
   2022-08-02T07:56:10.2568164Z + popd
   2022-08-02T07:56:10.2568340Z Had CRAN check errors; see logs.
   2022-08-02T07:56:10.2570433Z [error] running /__w/spark/spark/R/run-tests.sh ; received return code 255
   2022-08-02T07:56:10.5623856Z ##[error]Process completed with exit code 20.
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37368: [SPARK-39940][SS] Refresh catalog table on streaming query with DSv1 sink

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37368:
URL: https://github.com/apache/spark/pull/37368#issuecomment-1202208665

   Thanks! Merging to master. (Since this depends on SPARK-39564 which is only in 3.4.0, I'll merge the fix to 3.4.0 only for now.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org