You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/26 00:22:40 UTC

spark git commit: [SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter

Repository: spark
Updated Branches:
  refs/heads/master 17f469bc8 -> d2e7deb59


[SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter

## What changes were proposed in this pull request?
```Scala
      val udf1 = udf({(x: Int, y: Int) => x + y})
      val df = spark.range(0, 3).toDF("a")
        .withColumn("b", udf1($"a", udf1($"a", lit(10))))
      df.cache()
      df.write.saveAsTable("t")
```
Cache is not being used because the plans do not match with the cached plan. This is a regression caused by the changes we made in AnalysisBarrier, since not all the Analyzer rules are idempotent.

## How was this patch tested?
Added a test.

Also found a bug in the DSV1 write path. This is not a regression. Thus, opened a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869

Author: Xiao Li <ga...@gmail.com>

Closes #21821 from gatorsmile/testMaster22.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2e7deb5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2e7deb5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2e7deb5

Branch: refs/heads/master
Commit: d2e7deb59f641e93778b763d5396f73d38f9a785
Parents: 17f469b
Author: Xiao Li <ga...@gmail.com>
Authored: Wed Jul 25 17:22:37 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Wed Jul 25 17:22:37 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 10 +++--
 .../spark/sql/execution/command/ddl.scala       |  7 ++--
 .../scala/org/apache/spark/sql/UDFSuite.scala   | 42 +++++++++++++++++++-
 3 files changed, 51 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d2e7deb5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index b9fa43f..39c0e10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -254,7 +254,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
           val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
           if (writer.isPresent) {
             runCommand(df.sparkSession, "save") {
-              WriteToDataSourceV2(writer.get(), df.logicalPlan)
+              WriteToDataSourceV2(writer.get(), df.planWithBarrier)
             }
           }
 
@@ -275,7 +275,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         sparkSession = df.sparkSession,
         className = source,
         partitionColumns = partitioningColumns.getOrElse(Nil),
-        options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan))
+        options = extraOptions.toMap).planForWriting(mode, df.planWithBarrier)
     }
   }
 
@@ -323,7 +323,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       InsertIntoTable(
         table = UnresolvedRelation(tableIdent),
         partition = Map.empty[String, Option[String]],
-        query = df.logicalPlan,
+        query = df.planWithBarrier,
         overwrite = mode == SaveMode.Overwrite,
         ifPartitionNotExists = false)
     }
@@ -459,7 +459,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       partitionColumnNames = partitioningColumns.getOrElse(Nil),
       bucketSpec = getBucketSpec)
 
-    runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan)))
+    runCommand(df.sparkSession, "saveAsTable") {
+      CreateTable(tableDesc, mode, Some(df.planWithBarrier))
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d2e7deb5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 04bf8c6..c7f7e4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
+import org.apache.spark.sql.catalyst.analysis.{EliminateBarriers, NoSuchTableException, Resolver}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -891,8 +891,9 @@ object DDLUtils {
    * Throws exception if outputPath tries to overwrite inputpath.
    */
   def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = {
-    val inputPaths = query.collect {
-      case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
+    val inputPaths = EliminateBarriers(query).collect {
+      case LogicalRelation(r: HadoopFsRelation, _, _, _) =>
+        r.location.rootPaths
     }.flatten
 
     if (inputPaths.contains(outputPath)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d2e7deb5/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index d807457..30dca94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -19,11 +19,16 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.api.java._
 import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, ExplainCommand}
+import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
 import org.apache.spark.sql.functions.{lit, udf}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData._
 import org.apache.spark.sql.types.{DataTypes, DoubleType}
+import org.apache.spark.sql.util.QueryExecutionListener
+
 
 private case class FunctionResult(f1: String, f2: String)
 
@@ -325,6 +330,41 @@ class UDFSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("cached Data should be used in the write path") {
+    withTable("t") {
+      withTempPath { path =>
+        var numTotalCachedHit = 0
+        val listener = new QueryExecutionListener {
+          override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {}
+
+          override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
+            qe.withCachedData match {
+              case c: CreateDataSourceTableAsSelectCommand
+                  if c.query.isInstanceOf[InMemoryRelation] =>
+                numTotalCachedHit += 1
+              case i: InsertIntoHadoopFsRelationCommand
+                  if i.query.isInstanceOf[InMemoryRelation] =>
+                numTotalCachedHit += 1
+              case _ =>
+            }
+          }
+        }
+        spark.listenerManager.register(listener)
+
+        val udf1 = udf({ (x: Int, y: Int) => x + y })
+        val df = spark.range(0, 3).toDF("a")
+          .withColumn("b", udf1($"a", lit(10)))
+        df.cache()
+        df.write.saveAsTable("t")
+        assert(numTotalCachedHit == 1, "expected to be cached in saveAsTable")
+        df.write.insertInto("t")
+        assert(numTotalCachedHit == 2, "expected to be cached in insertInto")
+        df.write.save(path.getCanonicalPath)
+        assert(numTotalCachedHit == 3, "expected to be cached in save for native")
+      }
+    }
+  }
+
   test("SPARK-24891 Fix HandleNullInputsForUDF rule") {
     val udf1 = udf({(x: Int, y: Int) => x + y})
     val df = spark.range(0, 3).toDF("a")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org