You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/09/20 03:47:54 UTC

[spark] branch branch-3.3 updated: [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new fca6ab996d6 [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata
fca6ab996d6 is described below

commit fca6ab996d6a9e013093d49625bf7e6c15d1c0d2
Author: yaohua <ya...@databricks.com>
AuthorDate: Tue Sep 20 12:46:58 2022 +0900

    [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata
    
    ### What changes were proposed in this pull request?
    
    Cherry-picked from #37905
    
    Streaming metrics report all 0 (`processedRowsPerSecond`, etc) when selecting `_metadata` column. Because the logical plan from the batch and the actual planned logical plan are mismatched. So, [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L348) we cannot find the plan and collect metrics correctly.
    
    This PR fixes this by replacing the initial `LogicalPlan` with the `LogicalPlan` containing the metadata column
    
    ### Why are the changes needed?
    Bug fix.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing + New UTs
    
    Closes #37932 from Yaohua628/spark-40460-3-3.
    
    Authored-by: yaohua <ya...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../execution/streaming/MicroBatchExecution.scala  | 14 +++++---
 .../datasources/FileMetadataStructSuite.scala      | 38 ++++++++++++++++++++--
 2 files changed, 45 insertions(+), 7 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 3b409fa2f6a..d8806f03443 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import scala.collection.mutable.{Map => MutableMap}
+import scala.collection.mutable
 
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
@@ -540,7 +541,7 @@ class MicroBatchExecution(
     logDebug(s"Running batch $currentBatchId")
 
     // Request unprocessed data from all sources.
-    newData = reportTimeTaken("getBatch") {
+    val mutableNewData = mutable.Map.empty ++ reportTimeTaken("getBatch") {
       availableOffsets.flatMap {
         case (source: Source, available: Offset)
           if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
@@ -577,7 +578,7 @@ class MicroBatchExecution(
     val newBatchesPlan = logicalPlan transform {
       // For v1 sources.
       case StreamingExecutionRelation(source, output) =>
-        newData.get(source).map { dataPlan =>
+        mutableNewData.get(source).map { dataPlan =>
           val hasFileMetadata = output.exists {
             case FileSourceMetadataAttribute(_) => true
             case _ => false
@@ -586,6 +587,11 @@ class MicroBatchExecution(
             case l: LogicalRelation if hasFileMetadata => l.withMetadataColumns()
             case _ => dataPlan
           }
+          // SPARK-40460: overwrite the entry with the new logicalPlan
+          // because it might contain the _metadata column. It is a necessary change,
+          // in the ProgressReporter, we use the following mapping to get correct streaming metrics:
+          // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
+          mutableNewData.put(source, finalDataPlan)
           val maxFields = SQLConf.get.maxToStringFields
           assert(output.size == finalDataPlan.output.size,
             s"Invalid batch: ${truncatedString(output, ",", maxFields)} != " +
@@ -601,14 +607,14 @@ class MicroBatchExecution(
 
       // For v2 sources.
       case r: StreamingDataSourceV2Relation =>
-        newData.get(r.stream).map {
+        mutableNewData.get(r.stream).map {
           case OffsetHolder(start, end) =>
             r.copy(startOffset = Some(start), endOffset = Some(end))
         }.getOrElse {
           LocalRelation(r.output, isStreaming = true)
         }
     }
-
+    newData = mutableNewData.toMap
     // Rewire the plan to use the new attributes that were returned by the source.
     val newAttributePlan = newBatchesPlan.transformAllExpressionsWithPruning(
       _.containsPattern(CURRENT_LIKE)) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 6afea42ee83..ad75f634050 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Ro
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.Trigger
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
 
@@ -518,16 +519,19 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
     withTempDir { dir =>
       df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data")
 
-      val stream = spark.readStream.format("json")
+      val streamDf = spark.readStream.format("json")
         .schema(schema)
         .load(dir.getCanonicalPath + "/source/new-streaming-data")
         .select("*", "_metadata")
+
+      val streamQuery0 = streamDf
         .writeStream.format("json")
         .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
+        .trigger(Trigger.AvailableNow())
         .start(dir.getCanonicalPath + "/target/new-streaming-data")
 
-      stream.processAllAvailable()
-      stream.stop()
+      streamQuery0.awaitTermination()
+      assert(streamQuery0.lastProgress.numInputRows == 2L)
 
       val newDF = spark.read.format("json")
         .load(dir.getCanonicalPath + "/target/new-streaming-data")
@@ -565,6 +569,34 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
             sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME))
         )
       )
+
+      // Verify self-union
+      val streamQuery1 = streamDf.union(streamDf)
+        .writeStream.format("json")
+        .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint_union")
+        .trigger(Trigger.AvailableNow())
+        .start(dir.getCanonicalPath + "/target/new-streaming-data-union")
+      streamQuery1.awaitTermination()
+      val df1 = spark.read.format("json")
+        .load(dir.getCanonicalPath + "/target/new-streaming-data-union")
+      // Verify self-union results
+      assert(streamQuery1.lastProgress.numInputRows == 4L)
+      assert(df1.count() == 4L)
+      assert(df1.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))
+
+      // Verify self-join
+      val streamQuery2 = streamDf.join(streamDf, Seq("name", "age", "info", "_metadata"))
+        .writeStream.format("json")
+        .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint_join")
+        .trigger(Trigger.AvailableNow())
+        .start(dir.getCanonicalPath + "/target/new-streaming-data-join")
+      streamQuery2.awaitTermination()
+      val df2 = spark.read.format("json")
+        .load(dir.getCanonicalPath + "/target/new-streaming-data-join")
+      // Verify self-join results
+      assert(streamQuery2.lastProgress.numInputRows == 4L)
+      assert(df2.count() == 2L)
+      assert(df2.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org