You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/09/20 03:47:54 UTC
[spark] branch branch-3.3 updated: [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new fca6ab996d6 [SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata
fca6ab996d6 is described below
commit fca6ab996d6a9e013093d49625bf7e6c15d1c0d2
Author: yaohua <ya...@databricks.com>
AuthorDate: Tue Sep 20 12:46:58 2022 +0900
[SPARK-40460][SS][3.3] Fix streaming metrics when selecting _metadata
### What changes were proposed in this pull request?
Cherry-picked from #37905
Streaming metrics report all 0 (`processedRowsPerSecond`, etc) when selecting `_metadata` column. Because the logical plan from the batch and the actual planned logical plan are mismatched. So, [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L348) we cannot find the plan and collect metrics correctly.
This PR fixes this by replacing the initial `LogicalPlan` with the `LogicalPlan` containing the metadata column
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing + New UTs
Closes #37932 from Yaohua628/spark-40460-3-3.
Authored-by: yaohua <ya...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../execution/streaming/MicroBatchExecution.scala | 14 +++++---
.../datasources/FileMetadataStructSuite.scala | 38 ++++++++++++++++++++--
2 files changed, 45 insertions(+), 7 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 3b409fa2f6a..d8806f03443 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.streaming
import scala.collection.mutable.{Map => MutableMap}
+import scala.collection.mutable
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
@@ -540,7 +541,7 @@ class MicroBatchExecution(
logDebug(s"Running batch $currentBatchId")
// Request unprocessed data from all sources.
- newData = reportTimeTaken("getBatch") {
+ val mutableNewData = mutable.Map.empty ++ reportTimeTaken("getBatch") {
availableOffsets.flatMap {
case (source: Source, available: Offset)
if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
@@ -577,7 +578,7 @@ class MicroBatchExecution(
val newBatchesPlan = logicalPlan transform {
// For v1 sources.
case StreamingExecutionRelation(source, output) =>
- newData.get(source).map { dataPlan =>
+ mutableNewData.get(source).map { dataPlan =>
val hasFileMetadata = output.exists {
case FileSourceMetadataAttribute(_) => true
case _ => false
@@ -586,6 +587,11 @@ class MicroBatchExecution(
case l: LogicalRelation if hasFileMetadata => l.withMetadataColumns()
case _ => dataPlan
}
+ // SPARK-40460: overwrite the entry with the new logicalPlan
+ // because it might contain the _metadata column. It is a necessary change,
+ // in the ProgressReporter, we use the following mapping to get correct streaming metrics:
+ // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
+ mutableNewData.put(source, finalDataPlan)
val maxFields = SQLConf.get.maxToStringFields
assert(output.size == finalDataPlan.output.size,
s"Invalid batch: ${truncatedString(output, ",", maxFields)} != " +
@@ -601,14 +607,14 @@ class MicroBatchExecution(
// For v2 sources.
case r: StreamingDataSourceV2Relation =>
- newData.get(r.stream).map {
+ mutableNewData.get(r.stream).map {
case OffsetHolder(start, end) =>
r.copy(startOffset = Some(start), endOffset = Some(end))
}.getOrElse {
LocalRelation(r.output, isStreaming = true)
}
}
-
+ newData = mutableNewData.toMap
// Rewire the plan to use the new attributes that were returned by the source.
val newAttributePlan = newBatchesPlan.transformAllExpressionsWithPruning(
_.containsPattern(CURRENT_LIKE)) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 6afea42ee83..ad75f634050 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Ro
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
@@ -518,16 +519,19 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
df.coalesce(1).write.format("json").save(dir.getCanonicalPath + "/source/new-streaming-data")
- val stream = spark.readStream.format("json")
+ val streamDf = spark.readStream.format("json")
.schema(schema)
.load(dir.getCanonicalPath + "/source/new-streaming-data")
.select("*", "_metadata")
+
+ val streamQuery0 = streamDf
.writeStream.format("json")
.option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
+ .trigger(Trigger.AvailableNow())
.start(dir.getCanonicalPath + "/target/new-streaming-data")
- stream.processAllAvailable()
- stream.stop()
+ streamQuery0.awaitTermination()
+ assert(streamQuery0.lastProgress.numInputRows == 2L)
val newDF = spark.read.format("json")
.load(dir.getCanonicalPath + "/target/new-streaming-data")
@@ -565,6 +569,34 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME))
)
)
+
+ // Verify self-union
+ val streamQuery1 = streamDf.union(streamDf)
+ .writeStream.format("json")
+ .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint_union")
+ .trigger(Trigger.AvailableNow())
+ .start(dir.getCanonicalPath + "/target/new-streaming-data-union")
+ streamQuery1.awaitTermination()
+ val df1 = spark.read.format("json")
+ .load(dir.getCanonicalPath + "/target/new-streaming-data-union")
+ // Verify self-union results
+ assert(streamQuery1.lastProgress.numInputRows == 4L)
+ assert(df1.count() == 4L)
+ assert(df1.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))
+
+ // Verify self-join
+ val streamQuery2 = streamDf.join(streamDf, Seq("name", "age", "info", "_metadata"))
+ .writeStream.format("json")
+ .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint_join")
+ .trigger(Trigger.AvailableNow())
+ .start(dir.getCanonicalPath + "/target/new-streaming-data-join")
+ streamQuery2.awaitTermination()
+ val df2 = spark.read.format("json")
+ .load(dir.getCanonicalPath + "/target/new-streaming-data-join")
+ // Verify self-join results
+ assert(streamQuery2.lastProgress.numInputRows == 4L)
+ assert(df2.count() == 2L)
+ assert(df2.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org