You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/06/27 20:48:42 UTC

[spark] branch master updated: [SPARK-39564][SS] Expose the information of catalog table to the logical plan in streaming query

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c46b06c273b [SPARK-39564][SS] Expose the information of catalog table to the logical plan in streaming query
c46b06c273b is described below

commit c46b06c273b06df8fa06c8ca9138d81f520c8d7d
Author: Jungtaek Lim <ka...@gmail.com>
AuthorDate: Tue Jun 28 05:48:28 2022 +0900

    [SPARK-39564][SS] Expose the information of catalog table to the logical plan in streaming query
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to expose the information of catalog table (V1/V2) to the logical plan in streaming query, specifically, parsed plan and analyzed plan. (We may discard some information in optimized plan.)
    
    The major change is to propagate the information of catalog table from the place we resolve the table to the place we execute the query. In MicroBatch execution, we have several transformations on the logical plan which replace the node with another node, hence this PR touches multiple logical nodes which the code path passes through.
    
    Specifically for DSv1 sink, we don't have a specific write logical node, hence it's not feasible to expose the information for the destination. This PR introduces `WriteToMicroBatchDataSourceV1` which is DSv1 version of `WriteToMicroBatchDataSource` as a logical node for DSv1 sink. Worth noting that `WriteToMicroBatchDataSourceV1` plays as a marker - we eliminate this node in streaming specific optimization phase.
    
    ### Why are the changes needed?
    
    This PR give a better UX to end users who use table API for streaming query. Previously it's not easy or even not feasible to check which tables are being read and written from the streaming query. Most likely it requires end users to look into their code/query.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, in parsed/analyzed plan, we now expose the table information into the read/write logical node. Specifically for DSv1, we introduce a marker write node to expose the information for destination without majorly changing existing logic.
    
    > DSv1 read and write
    
    >> Before the patch
    
    <img width="1635" alt="original-read-from-dsv1-write-to-dsv1" src="https://user-images.githubusercontent.com/1317309/175210731-dcc4cc4d-a70b-467d-b577-79c20600db32.png">
    
    >> After the patch
    
    <img width="1716" alt="proposal-read-from-dsv1-write-to-dsv1" src="https://user-images.githubusercontent.com/1317309/175262327-960a23f5-6b24-4b89-bdaf-766a5c31aaf1.png">
    
    > DSv2 read and write
    
    >> Before the patch
    
    <img width="1684" alt="original-read-from-dsv2-write-to-dsv2" src="https://user-images.githubusercontent.com/1317309/175210780-4a99c670-8a42-4511-959c-cafe0c24bc00.png">
    
    >> After the patch
    
    <img width="1755" alt="proposal-read-from-dsv2-write-to-dsv2" src="https://user-images.githubusercontent.com/1317309/175261938-fd7804a6-b98e-4202-ae23-622b512c66fa.png">
    
    Worth noting that the screenshot is taken with the config "spark.sql.ui.explainMode=extended". By default, we only show physical plan as formatted one, which hides the improvement being done here. Still, end users can run `query.explain(extended=true)` to print out plan"s" which contains parsed/analyzed plans.
    
    ### How was this patch tested?
    
    New test cases. Also manually tested via running following query and checked the UI page:
    
    > DSv1 read and write
    
    ```
    /*
    ./bin/spark-shell --conf "spark.sql.ui.explainMode=extended"
    */
    
    spark.sql("drop table if exists stream_source")
    
    spark.sql("drop table if exists stream_target")
    
    spark.sql("create table stream_source (col1 string, col2 int) using parquet")
    
    spark.sql("create table stream_target (col1 string, col2 int) using parquet")
    
    val checkpointDir = java.nio.file.Files.createTempDirectory("checkpoint-")
    
    val q = spark.readStream.table("stream_source").writeStream.format("parquet").option("checkpointLocation", checkpointDir.toString).toTable("stream_target")
    
    Thread.sleep(10000)
    
    spark.sql("insert into stream_source values ('a', 1)")
    spark.sql("insert into stream_source values ('a', 2)")
    spark.sql("insert into stream_source values ('a', 3)")
    
    q.processAllAvailable()
    
    spark.sql("insert into stream_source values ('b', 1)")
    spark.sql("insert into stream_source values ('b', 2)")
    spark.sql("insert into stream_source values ('b', 3)")
    
    q.processAllAvailable()
    
    spark.sql("insert into stream_source values ('c', 1)")
    spark.sql("insert into stream_source values ('c', 2)")
    spark.sql("insert into stream_source values ('c', 3)")
    
    q.processAllAvailable()
    
    q.stop()
    ```
    
    > DSv2 read and write
    
    ```
    /*
    ./bin/spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1\
        --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
        --conf spark.sql.catalog.local.type=hadoop \
        --conf spark.sql.catalog.local.warehouse=$PWD/warehouse \
        --conf spark.sql.ui.explainMode=extended
    */
    
    spark.sql("drop table if exists local.db.stream_target")
    
    spark.sql("create table local.db.stream_source (col1 string, col2 int) using iceberg")
    
    spark.sql("create table local.db.stream_target (col1 string, col2 int) using iceberg")
    
    val checkpointDir = java.nio.file.Files.createTempDirectory("checkpoint-")
    
    val q = spark.readStream.table("local.db.stream_source").writeStream.format("iceberg").option("checkpointLocation", checkpointDir.toString).toTable("local.db.stream_target")
    
    Thread.sleep(10000)
    
    spark.sql("insert into local.db.stream_source values ('a', 1)")
    spark.sql("insert into local.db.stream_source values ('a', 2)")
    spark.sql("insert into local.db.stream_source values ('a', 3)")
    
    q.processAllAvailable()
    
    spark.sql("insert into local.db.stream_source values ('b', 1)")
    spark.sql("insert into local.db.stream_source values ('b', 2)")
    spark.sql("insert into local.db.stream_source values ('b', 3)")
    
    q.processAllAvailable()
    
    spark.sql("insert into local.db.stream_source values ('c', 1)")
    spark.sql("insert into local.db.stream_source values ('c', 2)")
    spark.sql("insert into local.db.stream_source values ('c', 3)")
    
    q.processAllAvailable()
    
    q.stop()
    ```
    
    Closes #36963 from HeartSaVioR/SPARK-39564.
    
    Authored-by: Jungtaek Lim <ka...@gmail.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |   4 +-
 .../sql/catalyst/streaming/WriteToStream.scala     |   4 +-
 .../streaming/WriteToStreamStatement.scala         |   4 +-
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |  15 ++-
 .../datasources/v2/DataSourceV2Relation.scala      |  19 +++-
 .../spark/sql/execution/SparkStrategies.scala      |  12 +-
 .../execution/datasources/DataSourceStrategy.scala |   3 +-
 .../execution/streaming/IncrementalExecution.scala |   9 +-
 .../execution/streaming/MicroBatchExecution.scala  |  42 +++++--
 .../execution/streaming/ResolveWriteToStream.scala |   3 +-
 .../execution/streaming/StreamingRelation.scala    |  18 ++-
 .../streaming/continuous/ContinuousExecution.scala |   5 +-
 .../streaming/sources/MicroBatchWrite.scala        |  11 +-
 .../sources/WriteToMicroBatchDataSourceV1.scala    |  55 +++++++++
 .../spark/sql/streaming/DataStreamWriter.scala     |  23 ++--
 .../sql/streaming/StreamingQueryManager.scala      |  13 ++-
 .../sql-tests/results/explain-aqe.sql.out          |  14 +--
 .../resources/sql-tests/results/explain.sql.out    |  14 +--
 .../sql/streaming/FileStreamSourceSuite.scala      |   2 +-
 .../apache/spark/sql/streaming/StreamSuite.scala   |   3 +-
 .../streaming/test/DataStreamTableAPISuite.scala   | 124 ++++++++++++++++++++-
 21 files changed, 322 insertions(+), 75 deletions(-)

diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index e7a12dedf21..5a8caef9e5e 100644
--- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -120,7 +120,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSparkSession with K
 
       val sources: Seq[SparkDataStream] = {
         query.get.logicalPlan.collect {
-          case StreamingExecutionRelation(source: KafkaSource, _) => source
+          case StreamingExecutionRelation(source: KafkaSource, _, _) => source
           case r: StreamingDataSourceV2Relation if r.stream.isInstanceOf[KafkaMicroBatchStream] ||
               r.stream.isInstanceOf[KafkaContinuousStream] =>
             r.stream
@@ -1392,7 +1392,7 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase {
       makeSureGetOffsetCalled,
       AssertOnQuery { query =>
         query.logicalPlan.collect {
-          case StreamingExecutionRelation(_: KafkaSource, _) => true
+          case StreamingExecutionRelation(_: KafkaSource, _, _) => true
         }.nonEmpty
       }
     )
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala
index 80c441f184d..884a4165d07 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.streaming
 
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
@@ -32,7 +33,8 @@ case class WriteToStream(
     outputMode: OutputMode,
     deleteCheckpointOnStop: Boolean,
     inputQuery: LogicalPlan,
-    catalogAndIdent: Option[(TableCatalog, Identifier)] = None) extends UnaryNode {
+    catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
+    catalogTable: Option[CatalogTable]) extends UnaryNode {
 
   override def isStreaming: Boolean = true
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala
index 85a018a8f55..a6204b317d2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.streaming
 
 import org.apache.hadoop.conf.Configuration
 
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
@@ -52,7 +53,8 @@ case class WriteToStreamStatement(
     hadoopConf: Configuration,
     isContinuousTrigger: Boolean,
     inputQuery: LogicalPlan,
-    catalogAndIdent: Option[(TableCatalog, Identifier)] = None) extends UnaryNode {
+    catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
+    catalogTable: Option[CatalogTable] = None) extends UnaryNode {
 
   override def isStreaming: Boolean = true
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index fcbebf3ac7a..2d9a7b878b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -868,6 +868,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre
     case null => Nil
     case None => Nil
     case Some(null) => Nil
+    case Some(table: CatalogTable) =>
+      stringArgsForCatalogTable(table)
     case Some(any) => any :: Nil
     case map: CaseInsensitiveStringMap =>
       redactMapString(map.asCaseSensitiveMap().asScala, maxFields)
@@ -877,13 +879,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre
       t.copy(properties = Utils.redact(t.properties).toMap,
         options = Utils.redact(t.options).toMap) :: Nil
     case table: CatalogTable =>
-      table.storage.serde match {
-        case Some(serde) => table.identifier :: serde :: Nil
-        case _ => table.identifier :: Nil
-      }
+      stringArgsForCatalogTable(table)
+
     case other => other :: Nil
   }.mkString(", ")
 
+  private def stringArgsForCatalogTable(table: CatalogTable): Seq[Any] = {
+    table.storage.serde match {
+      case Some(serde) => table.identifier :: serde :: Nil
+      case _ => table.identifier :: Nil
+    }
+  }
+
   /**
    * ONE line description of this node.
    * @param maxFields Maximum number of fields that will be converted to strings.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 178a6b02875..2045c599337 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -68,7 +68,11 @@ case class DataSourceV2Relation(
   override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA)
 
   override def simpleString(maxFields: Int): String = {
-    s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
+    val qualifiedTableName = (catalog, identifier) match {
+      case (Some(cat), Some(ident)) => s"${cat.name()}.${ident.toString}"
+      case _ => ""
+    }
+    s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $qualifiedTableName $name"
   }
 
   override def computeStats(): Statistics = {
@@ -152,6 +156,8 @@ case class StreamingDataSourceV2Relation(
     output: Seq[Attribute],
     scan: Scan,
     stream: SparkDataStream,
+    catalog: Option[CatalogPlugin],
+    identifier: Option[Identifier],
     startOffset: Option[Offset] = None,
     endOffset: Option[Offset] = None)
   extends LeafNode with MultiInstanceRelation {
@@ -167,6 +173,17 @@ case class StreamingDataSourceV2Relation(
     case _ =>
       Statistics(sizeInBytes = conf.defaultSizeInBytes)
   }
+
+  private val stringArgsVal: Seq[Any] = {
+    val qualifiedTableName = (catalog, identifier) match {
+      case (Some(cat), Some(ident)) => Some(s"${cat.name()}.${ident.toString}")
+      case _ => None
+    }
+
+    Seq(output, qualifiedTableName, scan, stream, startOffset, endOffset)
+  }
+
+  override protected def stringArgs: Iterator[Any] = stringArgsVal.iterator
 }
 
 object DataSourceV2Relation {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 34891b3d1ab..e6ec97b3491 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -650,11 +650,17 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   object StreamingRelationStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case s: StreamingRelation =>
-        StreamingRelationExec(s.sourceName, s.output) :: Nil
+        val qualifiedTableName = s.dataSource.catalogTable.map(_.identifier.unquotedString)
+        StreamingRelationExec(s.sourceName, s.output, qualifiedTableName) :: Nil
       case s: StreamingExecutionRelation =>
-        StreamingRelationExec(s.toString, s.output) :: Nil
+        val qualifiedTableName = s.catalogTable.map(_.identifier.unquotedString)
+        StreamingRelationExec(s.toString, s.output, qualifiedTableName) :: Nil
       case s: StreamingRelationV2 =>
-        StreamingRelationExec(s.sourceName, s.output) :: Nil
+        val qualifiedTableName = (s.catalog, s.identifier) match {
+          case (Some(catalog), Some(identifier)) => Some(s"${catalog.name}.${identifier}")
+          case _ => None
+        }
+        StreamingRelationExec(s.sourceName, s.output, qualifiedTableName) :: Nil
       case _ => Nil
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 564668ab392..ba4c76285d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -272,7 +272,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
       SparkSession.active,
       className = table.provider.get,
       userSpecifiedSchema = Some(table.schema),
-      options = dsOptions)
+      options = dsOptions,
+      catalogTable = Some(table))
     StreamingRelation(dataSource)
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 9670c774a74..3f369ac5e97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.execution.{LocalLimitExec, QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, MergingSessionsExec, ObjectHashAggregateExec, SortAggregateExec, UpdatingSessionsExec}
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.util.Utils
@@ -77,7 +78,13 @@ class IncrementalExecution(
    */
   override
   lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
-    sparkSession.sessionState.optimizer.executeAndTrack(withCachedData,
+    // Performing streaming specific pre-optimization.
+    val preOptimized = withCachedData.transform {
+      // We eliminate the "marker" node for writer on DSv1 as it's only used as representation
+      // of sink information.
+      case w: WriteToMicroBatchDataSourceV1 => w.child
+    }
+    sparkSession.sessionState.optimizer.executeAndTrack(preOptimized,
       tracker).transformAllExpressionsWithPruning(
       _.containsAnyPattern(CURRENT_LIKE, EXPRESSION_WITH_RANDOM_SEED)) {
       case ts @ CurrentBatchTimestamp(timestamp, _, _) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index aebf2e43de8..3dd0670d9f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
-import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource
+import org.apache.spark.sql.execution.streaming.sources.{WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.Trigger
 import org.apache.spark.util.{Clock, Utils}
@@ -86,10 +86,11 @@ class MicroBatchExecution(
           val source = dataSourceV1.createSource(metadataPath)
           nextSourceId += 1
           logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]")
-          StreamingExecutionRelation(source, output)(sparkSession)
+          StreamingExecutionRelation(source, output, dataSourceV1.catalogTable)(sparkSession)
         })
 
-      case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, _, _, v1) =>
+      case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output,
+        catalog, identifier, v1) =>
         val dsStr = if (src.nonEmpty) s"[${src.get}]" else ""
         val v2Disabled = disabledSources.contains(src.getOrElse(None).getClass.getCanonicalName)
         if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
@@ -101,7 +102,7 @@ class MicroBatchExecution(
             // TODO: operator pushdown.
             val scan = table.newScanBuilder(options).build()
             val stream = scan.toMicroBatchStream(metadataPath)
-            StreamingDataSourceV2Relation(output, scan, stream)
+            StreamingDataSourceV2Relation(output, scan, stream, catalog, identifier)
           })
         } else if (v1.isEmpty) {
           throw QueryExecutionErrors.microBatchUnsupportedByDataSourceError(srcName)
@@ -113,7 +114,9 @@ class MicroBatchExecution(
               v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath)
             nextSourceId += 1
             logInfo(s"Using Source [$source] from DataSourceV2 named '$srcName' $dsStr")
-            StreamingExecutionRelation(source, output)(sparkSession)
+            // We don't have a catalog table but may have a table identifier. Given this is about
+            // v1 fallback path, we just give up and set the catalog table as None.
+            StreamingExecutionRelation(source, output, None)(sparkSession)
           })
         }
     }
@@ -168,7 +171,17 @@ class MicroBatchExecution(
           extraOptions,
           outputMode)
 
-      case _ => _logicalPlan
+      case s: Sink =>
+        WriteToMicroBatchDataSourceV1(
+          plan.catalogTable,
+          sink = s,
+          query = _logicalPlan,
+          queryId = id.toString,
+          extraOptions,
+          outputMode)
+
+      case _ =>
+        throw new IllegalArgumentException(s"unknown sink type for $sink")
     }
   }
 
@@ -576,14 +589,24 @@ class MicroBatchExecution(
     // Replace sources in the logical plan with data that has arrived since the last batch.
     val newBatchesPlan = logicalPlan transform {
       // For v1 sources.
-      case StreamingExecutionRelation(source, output) =>
+      case StreamingExecutionRelation(source, output, catalogTable) =>
         newData.get(source).map { dataPlan =>
           val hasFileMetadata = output.exists {
             case FileSourceMetadataAttribute(_) => true
             case _ => false
           }
           val finalDataPlan = dataPlan transformUp {
-            case l: LogicalRelation if hasFileMetadata => l.withMetadataColumns()
+            case l: LogicalRelation =>
+              var newRelation = l
+              if (hasFileMetadata) {
+                newRelation = newRelation.withMetadataColumns()
+              }
+              catalogTable.foreach { table =>
+                assert(newRelation.catalogTable.isEmpty,
+                  s"Source $source should not produce the information of catalog table by its own.")
+                newRelation = newRelation.copy(catalogTable = Some(table))
+              }
+              newRelation
           }
           val maxFields = SQLConf.get.maxToStringFields
           assert(output.size == finalDataPlan.output.size,
@@ -626,7 +649,8 @@ class MicroBatchExecution(
     }
 
     val triggerLogicalPlan = sink match {
-      case _: Sink => newAttributePlan
+      case _: Sink =>
+        newAttributePlan.asInstanceOf[WriteToMicroBatchDataSourceV1].withNewBatchId(currentBatchId)
       case _: SupportsWrite =>
         newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].withNewBatchId(currentBatchId)
       case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
index 1c2554b974f..fa2a7885eb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala
@@ -61,7 +61,8 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper {
         s.outputMode,
         deleteCheckpointOnStop,
         s.inputQuery,
-        s.catalogAndIdent)
+        s.catalogAndIdent,
+        s.catalogTable)
   }
 
   def resolveCheckpointLocation(s: WriteToStreamStatement): (String, Boolean) = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 00962a4f4cd..7b177f3b67d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.connector.read.streaming.SparkDataStream
@@ -89,7 +90,8 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
  */
 case class StreamingExecutionRelation(
     source: SparkDataStream,
-    output: Seq[Attribute])(session: SparkSession)
+    output: Seq[Attribute],
+    catalogTable: Option[CatalogTable])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
 
   override def otherCopyArgs: Seq[AnyRef] = session :: Nil
@@ -111,7 +113,10 @@ case class StreamingExecutionRelation(
  * A dummy physical plan for [[StreamingRelation]] to support
  * [[org.apache.spark.sql.Dataset.explain]]
  */
-case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) extends LeafExecNode {
+case class StreamingRelationExec(
+    sourceName: String,
+    output: Seq[Attribute],
+    tableIdentifier: Option[String]) extends LeafExecNode {
   override def toString: String = sourceName
   override protected def doExecute(): RDD[InternalRow] = {
     throw QueryExecutionErrors.cannotExecuteStreamingRelationExecError()
@@ -120,6 +125,13 @@ case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) ext
 
 object StreamingExecutionRelation {
   def apply(source: Source, session: SparkSession): StreamingExecutionRelation = {
-    StreamingExecutionRelation(source, source.schema.toAttributes)(session)
+    StreamingExecutionRelation(source, source.schema.toAttributes, None)(session)
+  }
+
+  def apply(
+      source: Source,
+      session: SparkSession,
+      catalogTable: CatalogTable): StreamingExecutionRelation = {
+    StreamingExecutionRelation(source, source.schema.toAttributes, Some(catalogTable))(session)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 665ed77007b..09b54889bfb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -64,7 +64,8 @@ class ContinuousExecution(
     var nextSourceId = 0
     import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
     val _logicalPlan = analyzedPlan.transform {
-      case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, _, _, _) =>
+      case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output,
+        catalog, identifier, _) =>
         val dsStr = if (ds.nonEmpty) s"[${ds.get}]" else ""
         if (!table.supports(TableCapability.CONTINUOUS_READ)) {
           throw QueryExecutionErrors.continuousProcessingUnsupportedByDataSourceError(sourceName)
@@ -77,7 +78,7 @@ class ContinuousExecution(
           // TODO: operator pushdown.
           val scan = table.newScanBuilder(options).build()
           val stream = scan.toContinuousStream(metadataPath)
-          StreamingDataSourceV2Relation(output, scan, stream)
+          StreamingDataSourceV2Relation(output, scan, stream, catalog, identifier)
         })
     }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
index c2adc1dd674..0a603a3b141 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
@@ -26,18 +26,21 @@ import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactor
  * the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped
  * streaming write support.
  */
-class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWrite) extends BatchWrite {
+class MicroBatchWrite(epochId: Long, val writeSupport: StreamingWrite) extends BatchWrite {
+  override def toString: String = {
+    s"MicroBathWrite[epoch: $epochId, writer: $writeSupport]"
+  }
 
   override def commit(messages: Array[WriterCommitMessage]): Unit = {
-    writeSupport.commit(eppchId, messages)
+    writeSupport.commit(epochId, messages)
   }
 
   override def abort(messages: Array[WriterCommitMessage]): Unit = {
-    writeSupport.abort(eppchId, messages)
+    writeSupport.abort(epochId, messages)
   }
 
   override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
-    new MicroBatchWriterFactory(eppchId, writeSupport.createStreamingWriterFactory(info))
+    new MicroBatchWriterFactory(epochId, writeSupport.createStreamingWriterFactory(info))
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSourceV1.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSourceV1.scala
new file mode 100644
index 00000000000..e67019274ec
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSourceV1.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.streaming.OutputMode
+
+/**
+ * Marker node to represent a DSv1 sink on streaming query.
+ *
+ * Despite this is expected to be the top node, this node should behave like "pass-through"
+ * since the DSv1 codepath on microbatch execution handles sink operation separately.
+ *
+ * This node is eliminated in streaming specific optimization phase, which means there is no
+ * matching physical node.
+ */
+case class WriteToMicroBatchDataSourceV1(
+    catalogTable: Option[CatalogTable],
+    sink: Sink,
+    query: LogicalPlan,
+    queryId: String,
+    writeOptions: Map[String, String],
+    outputMode: OutputMode,
+    batchId: Option[Long] = None)
+  extends UnaryNode {
+
+  override def child: LogicalPlan = query
+
+  override def output: Seq[Attribute] = query.output
+
+  def withNewBatchId(batchId: Long): WriteToMicroBatchDataSourceV1 = {
+    copy(batchId = Some(batchId))
+  }
+
+  override protected def withNewChildInternal(
+      newChild: LogicalPlan): WriteToMicroBatchDataSourceV1 = copy(query = newChild)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index af058315f7c..0084ca53a26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -319,8 +319,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         throw QueryCompilationErrors.inputSourceDiffersFromDataSourceProviderError(
           source, tableName, table)
       }
-      format(table.provider.get)
-        .option("path", new Path(table.location).toString).start()
+      format(table.provider.get).startInternal(
+        Some(new Path(table.location).toString), catalogTable = Some(table))
     }
 
     import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
@@ -335,7 +335,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     }
   }
 
-  private def startInternal(path: Option[String]): StreamingQuery = {
+  private def startInternal(
+      path: Option[String],
+      catalogTable: Option[CatalogTable] = None): StreamingQuery = {
     if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
       throw QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("write")
     }
@@ -348,20 +350,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       val sink = new MemorySink()
       val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink, df.schema.toAttributes))
       val recoverFromCheckpoint = outputMode == OutputMode.Complete()
-      val query = startQuery(sink, extraOptions, recoverFromCheckpoint = recoverFromCheckpoint)
+      val query = startQuery(sink, extraOptions, recoverFromCheckpoint = recoverFromCheckpoint,
+        catalogTable = catalogTable)
       resultDf.createOrReplaceTempView(query.name)
       query
     } else if (source == SOURCE_NAME_FOREACH) {
       assertNotPartitioned(SOURCE_NAME_FOREACH)
       val sink = ForeachWriterTable[T](foreachWriter, ds.exprEnc)
-      startQuery(sink, extraOptions)
+      startQuery(sink, extraOptions, catalogTable = catalogTable)
     } else if (source == SOURCE_NAME_FOREACH_BATCH) {
       assertNotPartitioned(SOURCE_NAME_FOREACH_BATCH)
       if (trigger.isInstanceOf[ContinuousTrigger]) {
         throw QueryCompilationErrors.sourceNotSupportedWithContinuousTriggerError(source)
       }
       val sink = new ForeachBatchSink[T](foreachBatchWriter, ds.exprEnc)
-      startQuery(sink, extraOptions)
+      startQuery(sink, extraOptions, catalogTable = catalogTable)
     } else {
       val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
       val disabledSources =
@@ -403,7 +406,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         createV1Sink(optionsWithPath)
       }
 
-      startQuery(sink, optionsWithPath)
+      startQuery(sink, optionsWithPath, catalogTable = catalogTable)
     }
   }
 
@@ -411,7 +414,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       sink: Table,
       newOptions: CaseInsensitiveMap[String],
       recoverFromCheckpoint: Boolean = true,
-      catalogAndIdent: Option[(TableCatalog, Identifier)] = None): StreamingQuery = {
+      catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
+      catalogTable: Option[CatalogTable] = None): StreamingQuery = {
     val useTempCheckpointLocation = SOURCES_ALLOW_ONE_TIME_QUERY.contains(source)
 
     df.sparkSession.sessionState.streamingQueryManager.startQuery(
@@ -424,7 +428,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       useTempCheckpointLocation = useTempCheckpointLocation,
       recoverFromCheckpointLocation = recoverFromCheckpoint,
       trigger = trigger,
-      catalogAndIdent = catalogAndIdent)
+      catalogAndIdent = catalogAndIdent,
+      catalogTable = catalogTable)
   }
 
   private def createV1Sink(optionsWithPath: CaseInsensitiveMap[String]): Sink = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 6548d5fe0a2..4f75e749ecc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement}
 import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog}
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -242,7 +243,8 @@ class StreamingQueryManager private[sql] (
       recoverFromCheckpointLocation: Boolean,
       trigger: Trigger,
       triggerClock: Clock,
-      catalogAndIdent: Option[(TableCatalog, Identifier)] = None): StreamingQueryWrapper = {
+      catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
+      catalogTable: Option[CatalogTable] = None): StreamingQueryWrapper = {
     val analyzedPlan = df.queryExecution.analyzed
     df.queryExecution.assertAnalyzed()
 
@@ -256,7 +258,8 @@ class StreamingQueryManager private[sql] (
       df.sparkSession.sessionState.newHadoopConf(),
       trigger.isInstanceOf[ContinuousTrigger],
       analyzedPlan,
-      catalogAndIdent)
+      catalogAndIdent,
+      catalogTable)
 
     val analyzedStreamWritePlan =
       sparkSession.sessionState.executePlan(dataStreamWritePlan).analyzed
@@ -311,7 +314,8 @@ class StreamingQueryManager private[sql] (
       recoverFromCheckpointLocation: Boolean = true,
       trigger: Trigger = Trigger.ProcessingTime(0),
       triggerClock: Clock = new SystemClock(),
-      catalogAndIdent: Option[(TableCatalog, Identifier)] = None): StreamingQuery = {
+      catalogAndIdent: Option[(TableCatalog, Identifier)] = None,
+      catalogTable: Option[CatalogTable] = None): StreamingQuery = {
     val query = createQuery(
       userSpecifiedName,
       userSpecifiedCheckpointLocation,
@@ -323,7 +327,8 @@ class StreamingQueryManager private[sql] (
       recoverFromCheckpointLocation,
       trigger,
       triggerClock,
-      catalogAndIdent)
+      catalogAndIdent,
+      catalogTable)
     // scalastyle:on argcount
 
     // The following code block checks if a stream with the same name or id is running. Then it
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index fc9d8525f0a..68cd7f43a76 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -908,19 +908,7 @@ Output: []
 Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true
 
 (3) LogicalRelation
-Arguments: parquet, [key#x, val#x], CatalogTable(
-Database: default
-Table: explain_temp1
-Created Time [not included in comparison]
-Last Access [not included in comparison]
-Created By [not included in comparison]
-Type: MANAGED
-Provider: PARQUET
-Location [not included in comparison]/{warehouse_dir}/explain_temp1
-Schema: root
--- key: integer (nullable = true)
--- val: integer (nullable = true)
-), false
+Arguments: parquet, [key#x, val#x], `default`.`explain_temp1`, false
 
 (4) SubqueryAlias
 Arguments: spark_catalog.default.explain_temp1
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index ea0c465a8cc..a2f2962cc9e 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -853,19 +853,7 @@ Output: []
 Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true
 
 (3) LogicalRelation
-Arguments: parquet, [key#x, val#x], CatalogTable(
-Database: default
-Table: explain_temp1
-Created Time [not included in comparison]
-Last Access [not included in comparison]
-Created By [not included in comparison]
-Type: MANAGED
-Provider: PARQUET
-Location [not included in comparison]/{warehouse_dir}/explain_temp1
-Schema: root
--- key: integer (nullable = true)
--- val: integer (nullable = true)
-), false
+Arguments: parquet, [key#x, val#x], `default`.`explain_temp1`, false
 
 (4) SubqueryAlias
 Arguments: spark_catalog.default.explain_temp1
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index d8c69b0984a..92819338843 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -192,7 +192,7 @@ abstract class FileStreamSourceTest
 
   protected def getSourcesFromStreamingQuery(query: StreamExecution): Seq[FileStreamSource] = {
     query.logicalPlan.collect {
-      case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
+      case StreamingExecutionRelation(source, _, _) if source.isInstanceOf[FileStreamSource] =>
         source.asInstanceOf[FileStreamSource]
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 6e1649380c3..62ef5824ed5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -107,7 +107,8 @@ class StreamSuite extends StreamTest {
   test("StreamingExecutionRelation.computeStats") {
     val memoryStream = MemoryStream[Int]
     val executionRelation = StreamingExecutionRelation(
-      memoryStream, memoryStream.encoder.schema.toAttributes)(memoryStream.sqlContext.sparkSession)
+      memoryStream, memoryStream.encoder.schema.toAttributes, None)(
+      memoryStream.sqlContext.sparkSession)
     assert(executionRelation.computeStats.sizeInBytes == spark.sessionState.conf.defaultSizeInBytes)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 61b3ec26a4d..7d69b441c82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableSessionCatal
 import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, SupportsRead, Table, TableCapability, V2TableWithV1Fallback}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamScanBuilder}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamScanBuilder, StreamingQueryWrapper}
+import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.streaming.sources.FakeScanBuilder
@@ -323,6 +324,127 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
     }
   }
 
+  test("explain with table on DSv1 data source") {
+    val tblSourceName = "tbl_src"
+    val tblTargetName = "tbl_target"
+    val tblSourceQualified = s"default.$tblSourceName"
+    val tblTargetQualified = s"`default`.`$tblTargetName`"
+
+    withTable(tblSourceQualified, tblTargetQualified) {
+      withTempDir { dir =>
+        sql(s"CREATE TABLE $tblSourceQualified (col1 string, col2 integer) USING parquet")
+        sql(s"CREATE TABLE $tblTargetQualified (col1 string, col2 integer) USING parquet")
+
+        sql(s"INSERT INTO $tblSourceQualified VALUES ('a', 1)")
+        sql(s"INSERT INTO $tblSourceQualified VALUES ('b', 2)")
+        sql(s"INSERT INTO $tblSourceQualified VALUES ('c', 3)")
+
+        val df = spark.readStream.table(tblSourceQualified)
+        val sq = df.writeStream
+          .format("parquet")
+          .option("checkpointLocation", dir.getCanonicalPath)
+          .toTable(tblTargetQualified)
+          .asInstanceOf[StreamingQueryWrapper].streamingQuery
+
+        try {
+          sq.processAllAvailable()
+
+          val explainWithoutExtended = sq.explainInternal(false)
+          // `extended = false` only displays the physical plan.
+          assert("FileScan".r
+            .findAllMatchIn(explainWithoutExtended).size === 1)
+          assert(tblSourceName.r
+            .findAllMatchIn(explainWithoutExtended).size === 1)
+
+          // We have marker node for DSv1 sink only in logical node. In physical plan, there is no
+          // information for DSv1 sink.
+
+          val explainWithExtended = sq.explainInternal(true)
+          // `extended = true` displays 3 logical plans (Parsed/Analyzed/Optimized) and 1 physical
+          // plan.
+          assert("Relation".r
+            .findAllMatchIn(explainWithExtended).size === 3)
+          assert("FileScan".r
+            .findAllMatchIn(explainWithExtended).size === 1)
+          // we don't compare with exact number since the number is also affected by SubqueryAlias
+          assert(tblSourceQualified.r
+            .findAllMatchIn(explainWithExtended).size >= 4)
+
+          assert("WriteToMicroBatchDataSourceV1".r
+            .findAllMatchIn(explainWithExtended).size === 2)
+          assert(tblTargetQualified.r
+            .findAllMatchIn(explainWithExtended).size >= 2)
+        } finally {
+          sq.stop()
+        }
+      }
+    }
+  }
+
+  test("explain with table on DSv2 data source") {
+    val tblSourceName = "tbl_src"
+    val tblTargetName = "tbl_target"
+    val tblSourceQualified = s"teststream.ns.$tblSourceName"
+    val tblTargetQualified = s"testcat.ns.$tblTargetName"
+
+    spark.sql("CREATE NAMESPACE teststream.ns")
+    spark.sql("CREATE NAMESPACE testcat.ns")
+
+    withTable(tblSourceQualified, tblTargetQualified) {
+      withTempDir { dir =>
+        sql(s"CREATE TABLE $tblSourceQualified (value int) USING foo")
+        sql(s"CREATE TABLE $tblTargetQualified (col1 string, col2 integer) USING foo")
+
+        val stream = MemoryStream[Int]
+        val testCatalog = spark.sessionState.catalogManager.catalog("teststream").asTableCatalog
+        val table = testCatalog.loadTable(Identifier.of(Array("ns"), tblSourceName))
+        table.asInstanceOf[InMemoryStreamTable].setStream(stream)
+
+        val df = spark.readStream.table(tblSourceQualified)
+          .select(lit('a'), $"value")
+        val sq = df.writeStream
+          .option("checkpointLocation", dir.getCanonicalPath)
+          .toTable(tblTargetQualified)
+          .asInstanceOf[StreamingQueryWrapper].streamingQuery
+
+        try {
+          stream.addData(1, 2, 3)
+
+          sq.processAllAvailable()
+
+          val explainWithoutExtended = sq.explainInternal(false)
+          // `extended = false` only displays the physical plan.
+          // we don't guarantee the table information is available in physical plan.
+          assert("MicroBatchScan".r
+            .findAllMatchIn(explainWithoutExtended).size === 1)
+          assert("WriteToDataSourceV2".r
+            .findAllMatchIn(explainWithoutExtended).size === 1)
+
+          val explainWithExtended = sq.explainInternal(true)
+          // `extended = true` displays 3 logical plans (Parsed/Analyzed/Optimized) and 1 physical
+          // plan.
+          assert("StreamingDataSourceV2Relation".r
+            .findAllMatchIn(explainWithExtended).size === 3)
+          // WriteToMicroBatchDataSource is used for both parsed and analyzed logical plan
+          assert("WriteToMicroBatchDataSource".r
+            .findAllMatchIn(explainWithExtended).size === 2)
+          // optimizer replaces WriteToMicroBatchDataSource to WriteToDataSourceV2
+          assert("WriteToDataSourceV2".r
+            .findAllMatchIn(explainWithExtended).size === 2)
+          assert("MicroBatchScan".r
+            .findAllMatchIn(explainWithExtended).size === 1)
+
+          assert(tblSourceQualified.r
+            .findAllMatchIn(explainWithExtended).size >= 3)
+          assert(tblTargetQualified.r
+            .findAllMatchIn(explainWithExtended).size >= 3)
+        } finally {
+          sq.stop()
+        }
+      }
+    }
+  }
+
   private def checkForStreamTable(dir: Option[File], tableName: String): Unit = {
     val memory = MemoryStream[Int]
     val dsw = memory.toDS().writeStream.format("parquet")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org