You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/08/19 16:31:59 UTC

[spark] branch branch-3.0 updated: [SPARK-28863][SQL] Introduce AlreadyPlanned to prevent reanalysis of V1FallbackWriters

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c4a12f2  [SPARK-28863][SQL] Introduce AlreadyPlanned to prevent reanalysis of V1FallbackWriters
c4a12f2 is described below

commit c4a12f22f8656b59bc9a341f6ed008167983d128
Author: Burak Yavuz <br...@gmail.com>
AuthorDate: Wed Aug 19 16:25:35 2020 +0000

    [SPARK-28863][SQL] Introduce AlreadyPlanned to prevent reanalysis of V1FallbackWriters
    
    ### What changes were proposed in this pull request?
    
    This PR introduces a LogicalNode AlreadyPlanned, and related physical plan and preparation rule.
    
    With the DataSourceV2 write operations, we have a way to fallback to the V1 writer APIs using InsertableRelation. The gross part is that we're in physical land, but the InsertableRelation takes a logical plan, so we have to pass the logical plans to these physical nodes, and then potentially go through re-planning. This re-planning can cause issues for an already optimized plan.
    
    A useful primitive could be specifying that a plan is ready for execution through a logical node AlreadyPlanned. This would wrap a physical plan, and then we can go straight to execution.
    
    ### Why are the changes needed?
    
    To avoid having a physical plan that is disconnected from the physical plan that is being executed in V1WriteFallback execution. When a physical plan node executes a logical plan, the inner query is not connected to the running physical plan. The physical plan that actually runs is not visible through the Spark UI and its metrics are not exposed. In some cases, the EXPLAIN plan doesn't show it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Nope
    
    ### How was this patch tested?
    
    V1FallbackWriterSuite tests that writes still work
    
    Closes #29469 from brkyvz/alreadyAnalyzed2.
    
    Lead-authored-by: Burak Yavuz <br...@gmail.com>
    Co-authored-by: Burak Yavuz <bu...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 278d0dd25bc1479ecda42d6f722106e4763edfae)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/AlreadyPlanned.scala       | 61 +++++++++++++++++
 .../spark/sql/execution/SparkStrategies.scala      |  1 +
 .../datasources/v2/DataSourceV2Strategy.scala      | 10 ++-
 .../datasources/v2/V1FallbackWriters.scala         | 22 +++---
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  4 --
 .../spark/sql/connector/V1WriteFallbackSuite.scala | 42 ++++++++++++
 .../spark/sql/execution/AlreadyPlannedSuite.scala  | 80 ++++++++++++++++++++++
 7 files changed, 196 insertions(+), 24 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyPlanned.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyPlanned.scala
new file mode 100644
index 0000000..9dd956a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AlreadyPlanned.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+
+/**
+ * A special node that allows skipping query planning all the way to physical execution. This node
+ * can be used when a query was planned to the physical level, but we had to go back to logical plan
+ * land for some reason (e.g. V1 DataSource write execution). This will allow the metrics, and the
+ * query plan to properly appear as part of the query execution.
+ */
+case class AlreadyPlanned(physicalPlan: SparkPlan) extends LogicalPlan with MultiInstanceRelation {
+  override def children: Seq[LogicalPlan] = Nil
+  override lazy val resolved: Boolean = true
+  override val output: Seq[Attribute] = physicalPlan.output
+  override def newInstance(): LogicalPlan = {
+    val newAttrs = output.map(a => a.exprId -> a.newInstance())
+    val attrMap = newAttrs.toMap
+    val projections = physicalPlan.output.map { o =>
+      Alias(o, o.name)(attrMap(o.exprId).exprId, o.qualifier, Option(o.metadata))
+    }
+    AlreadyPlanned(ProjectExec(projections, physicalPlan))
+  }
+}
+
+/** Query execution that skips re-analysis and planning. */
+class AlreadyPlannedExecution(
+    session: SparkSession,
+    plan: AlreadyPlanned) extends QueryExecution(session, plan) {
+  override lazy val analyzed: LogicalPlan = plan
+  override lazy val optimizedPlan: LogicalPlan = plan
+  override lazy val sparkPlan: SparkPlan = plan.physicalPlan
+  override lazy val executedPlan: SparkPlan = plan.physicalPlan
+}
+
+object AlreadyPlanned {
+  def dataFrame(sparkSession: SparkSession, query: SparkPlan): DataFrame = {
+    val qe = new AlreadyPlannedExecution(sparkSession, AlreadyPlanned(query))
+    new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f836deb..ec6e365 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -678,6 +678,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
   object BasicOperators extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case planned: AlreadyPlanned => planned.physicalPlan :: Nil
       case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
       case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index cca80c0..f2ff006 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -122,10 +122,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       val writeOptions = new CaseInsensitiveStringMap(options.asJava)
       catalog match {
         case staging: StagingTableCatalog =>
-          AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query),
+          AtomicCreateTableAsSelectExec(staging, ident, parts, planLater(query),
             propsWithOwner, writeOptions, ifNotExists) :: Nil
         case _ =>
-          CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query),
+          CreateTableAsSelectExec(catalog, ident, parts, planLater(query),
             propsWithOwner, writeOptions, ifNotExists) :: Nil
       }
 
@@ -152,7 +152,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
             staging,
             ident,
             parts,
-            query,
             planLater(query),
             propsWithOwner,
             writeOptions,
@@ -162,7 +161,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
             catalog,
             ident,
             parts,
-            query,
             planLater(query),
             propsWithOwner,
             writeOptions,
@@ -172,7 +170,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
     case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
       r.table.asWritable match {
         case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
-          AppendDataExecV1(v1, writeOptions.asOptions, query) :: Nil
+          AppendDataExecV1(v1, writeOptions.asOptions, planLater(query)) :: Nil
         case v2 =>
           AppendDataExec(v2, writeOptions.asOptions, planLater(query)) :: Nil
       }
@@ -186,7 +184,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       }.toArray
       r.table.asWritable match {
         case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
-          OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, query) :: Nil
+          OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, planLater(query)) :: Nil
         case v2 =>
           OverwriteByExpressionExec(v2, filters, writeOptions.asOptions, planLater(query)) :: Nil
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
index 7502a87..28c4e28 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
@@ -20,14 +20,11 @@ package org.apache.spark.sql.execution.datasources.v2
 import java.util.UUID
 
 import org.apache.spark.SparkException
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.SupportsWrite
 import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{AlreadyPlanned, SparkPlan}
 import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -39,7 +36,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 case class AppendDataExecV1(
     table: SupportsWrite,
     writeOptions: CaseInsensitiveStringMap,
-    plan: LogicalPlan) extends V1FallbackWriters {
+    query: SparkPlan) extends V1FallbackWriters {
 
   override protected def run(): Seq[InternalRow] = {
     writeWithV1(newWriteBuilder().buildForV1Write())
@@ -61,7 +58,7 @@ case class OverwriteByExpressionExecV1(
     table: SupportsWrite,
     deleteWhere: Array[Filter],
     writeOptions: CaseInsensitiveStringMap,
-    plan: LogicalPlan) extends V1FallbackWriters {
+    query: SparkPlan) extends V1FallbackWriters {
 
   private def isTruncate(filters: Array[Filter]): Boolean = {
     filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
@@ -84,7 +81,7 @@ case class OverwriteByExpressionExecV1(
 /** Some helper interfaces that use V2 write semantics through the V1 writer interface. */
 sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {
   override def output: Seq[Attribute] = Nil
-  override final def children: Seq[SparkPlan] = Nil
+  override final def children: Seq[SparkPlan] = Seq(query)
 
   def table: SupportsWrite
   def writeOptions: CaseInsensitiveStringMap
@@ -100,11 +97,9 @@ sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {
   protected def newWriteBuilder(): V1WriteBuilder = {
     val info = LogicalWriteInfoImpl(
       queryId = UUID.randomUUID().toString,
-      schema = plan.schema,
+      schema = query.schema,
       options = writeOptions)
-    val writeBuilder = table.newWriteBuilder(info)
-
-    writeBuilder.asV1Builder
+    table.newWriteBuilder(info).asV1Builder
   }
 }
 
@@ -112,11 +107,10 @@ sealed trait V1FallbackWriters extends V2CommandExec with SupportsV1Write {
  * A trait that allows Tables that use V1 Writer interfaces to append data.
  */
 trait SupportsV1Write extends SparkPlan {
-  // TODO: We should be able to work on SparkPlans at this point.
-  def plan: LogicalPlan
+  def query: SparkPlan
 
   protected def writeWithV1(relation: InsertableRelation): Seq[InternalRow] = {
-    relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false)
+    relation.insert(AlreadyPlanned.dataFrame(sqlContext.sparkSession, query), overwrite = false)
     Nil
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 616e18e..bf96132 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -62,7 +62,6 @@ case class CreateTableAsSelectExec(
     catalog: TableCatalog,
     ident: Identifier,
     partitioning: Seq[Transform],
-    plan: LogicalPlan,
     query: SparkPlan,
     properties: Map[String, String],
     writeOptions: CaseInsensitiveStringMap,
@@ -119,7 +118,6 @@ case class AtomicCreateTableAsSelectExec(
     catalog: StagingTableCatalog,
     ident: Identifier,
     partitioning: Seq[Transform],
-    plan: LogicalPlan,
     query: SparkPlan,
     properties: Map[String, String],
     writeOptions: CaseInsensitiveStringMap,
@@ -153,7 +151,6 @@ case class ReplaceTableAsSelectExec(
     catalog: TableCatalog,
     ident: Identifier,
     partitioning: Seq[Transform],
-    plan: LogicalPlan,
     query: SparkPlan,
     properties: Map[String, String],
     writeOptions: CaseInsensitiveStringMap,
@@ -219,7 +216,6 @@ case class AtomicReplaceTableAsSelectExec(
     catalog: StagingTableCatalog,
     ident: Identifier,
     partitioning: Seq[Transform],
-    plan: LogicalPlan,
     query: SparkPlan,
     properties: Map[String, String],
     writeOptions: CaseInsensitiveStringMap,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
index 10ed204..4b52a4c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
@@ -25,10 +25,14 @@ import scala.collection.mutable
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability}
 import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.internal.connector.SimpleTableProvider
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.test.SharedSparkSession
@@ -124,6 +128,23 @@ class V1WriteFallbackSuite extends QueryTest with SharedSparkSession with Before
     }
     assert(e3.getMessage.contains("schema"))
   }
+
+  test("fallback writes should only analyze plan once") {
+    SparkSession.clearActiveSession()
+    SparkSession.clearDefaultSession()
+    try {
+      val session = SparkSession.builder()
+        .master("local[1]")
+        .withExtensions(_.injectPostHocResolutionRule(_ => OnlyOnceRule))
+        .config(V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[V1FallbackTableCatalog].getName)
+        .getOrCreate()
+      val df = session.createDataFrame(Seq((1, "x"), (2, "y"), (3, "z")))
+      df.write.mode("append").option("name", "t1").format(v2Format).saveAsTable("test")
+    } finally {
+      SparkSession.setActiveSession(spark)
+      SparkSession.setDefaultSession(spark)
+    }
+  }
 }
 
 class V1WriteFallbackSessionCatalogSuite
@@ -318,3 +339,24 @@ class InMemoryTableWithV1Fallback(
     }
   }
 }
+
+/** A rule that fails if a query plan is analyzed twice. */
+object OnlyOnceRule extends Rule[LogicalPlan] {
+  private val tag = TreeNodeTag[String]("test")
+  private val counts = new mutable.HashMap[LogicalPlan, Int]()
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    if (plan.getTagValue(tag).isEmpty) {
+      plan.setTagValue(tag, "abc")
+      plan
+    } else {
+      val cnt = counts.getOrElseUpdate(plan, 0) + 1
+      // This rule will be run as injectPostHocResolutionRule, and is supposed to be run only twice.
+      // Once during planning and once during checkBatchIdempotence
+      assert(cnt <= 1, "This rule shouldn't have been called again")
+      counts.put(plan, cnt)
+      plan
+    }
+
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyPlannedSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyPlannedSuite.scala
new file mode 100644
index 0000000..9152dcf
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/AlreadyPlannedSuite.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.test.SharedSparkSession
+
+class AlreadyPlannedSuite extends SparkPlanTest with SharedSparkSession {
+
+  import testImplicits._
+
+  test("simple execution") {
+    val df = spark.range(10)
+    val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+
+    checkAnswer(planned, identity, df.toDF().collect())
+  }
+
+  test("planning on top works - projection") {
+    val df = spark.range(10)
+    val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+
+    checkAnswer(
+      planned.withColumn("data", 'id + 1),
+      identity,
+      df.withColumn("data", 'id + 1).collect())
+  }
+
+  test("planning on top works - filter") {
+    val df = spark.range(10)
+    val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+
+    checkAnswer(planned.where('id < 5), identity, df.where('id < 5).toDF().collect())
+  }
+
+  test("planning on top works - joins") {
+    val df = spark.range(10)
+    val planned = AlreadyPlanned.dataFrame(spark, df.queryExecution.sparkPlan)
+
+    val plannedLeft = planned.alias("l")
+    val dfLeft = df.alias("l")
+    val plannedRight = planned.alias("r")
+    val dfRight = df.alias("r")
+
+    checkAnswer(
+      plannedLeft.where('id < 3).join(plannedRight, Seq("id")),
+      identity,
+      dfLeft.where('id < 3).join(dfRight, Seq("id")).collect())
+
+    checkAnswer(
+      plannedLeft.where('id < 3).join(plannedRight, plannedLeft("id") === plannedRight("id")),
+      identity,
+      dfLeft.where('id < 3).join(dfRight, dfLeft("id") === dfRight("id")).collect())
+
+    checkAnswer(
+      plannedLeft.join(plannedRight, Seq("id")).where('id < 3),
+      identity,
+      dfLeft.join(dfRight, Seq("id")).where('id < 3).collect())
+
+    checkAnswer(
+      plannedLeft.join(plannedRight, plannedLeft("id") === plannedRight("id")).where($"l.id" < 3),
+      identity,
+      dfLeft.join(dfRight, dfLeft("id") === dfRight("id")).where($"l.id" < 3).collect())
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org