You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/23 16:52:31 UTC
spark git commit: [SPARK-24850][SQL] fix str representation of
CachedRDDBuilder
Repository: spark
Updated Branches:
refs/heads/master 08e315f63 -> 2edf17eff
[SPARK-24850][SQL] fix str representation of CachedRDDBuilder
## What changes were proposed in this pull request?
As of https://github.com/apache/spark/pull/21018, InMemoryRelation includes its cacheBuilder when logging query plans. This PR changes the string representation of the CachedRDDBuilder to not include the cached spark plan.
## How was this patch tested?
spark-shell, query:
```
var df_cached = spark.read.format("csv").option("header", "true").load("test.csv").cache()
0 to 1 foreach { _ =>
df_cached = df_cached.join(spark.read.format("csv").option("header", "true").load("test.csv"), "A").cache()
}
df_cached.explain
```
as of master results in:
```
== Physical Plan ==
InMemoryTableScan [A#10, B#11, B#35, B#87]
+- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35, B#87]
+- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
+- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#34)
+- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
+- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(2) Project [A#10, B#11, B#35]
: +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
: +- *(1) Filter isnotnull(A#34)
: +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
: +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#86)
+- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
+- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(2) Project [A#10, B#11, B#35, B#87]
+- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35]
+- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#34)
+- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
+- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(2) Project [A#10, B#11, B#35]
: +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
: +- *(1) Filter isnotnull(A#34)
: +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
: +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#86)
+- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
+- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
,None)
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
```
with this patch results in:
```
== Physical Plan ==
InMemoryTableScan [A#10, B#11, B#35, B#87]
+- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
+- *(2) Project [A#10, B#11, B#35, B#87]
+- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight
:- *(2) Filter isnotnull(A#10)
: +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)]
: +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
: +- *(2) Project [A#10, B#11, B#35]
: +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight
: :- *(2) Filter isnotnull(A#10)
: : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)]
: : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
: : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
: +- *(1) Filter isnotnull(A#34)
: +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)]
: +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
: +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(A#86)
+- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)]
+- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas))
+- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<A:string,B:string>
```
Author: Onur Satici <os...@palantir.com>
Closes #21805 from onursatici/os/inmemoryrelation-str.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2edf17ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2edf17ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2edf17ef
Branch: refs/heads/master
Commit: 2edf17effd8b0aba61c95dddd5823ad7277d6c7d
Parents: 08e315f
Author: Onur Satici <os...@palantir.com>
Authored: Mon Jul 23 09:52:28 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Mon Jul 23 09:52:28 2018 -0700
----------------------------------------------------------------------
.../spark/sql/execution/columnar/InMemoryRelation.scala | 5 ++++-
.../scala/org/apache/spark/sql/DatasetCacheSuite.scala | 11 +++++++++++
2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2edf17ef/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 7c8faec..1a8fbac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
/**
@@ -207,4 +207,7 @@ case class InMemoryRelation(
}
override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache)
+
+ override def simpleString: String =
+ s"InMemoryRelation [${Utils.truncatedString(output, ", ")}], ${cacheBuilder.storageLevel}"
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2edf17ef/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index 5c6a021..44177e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -206,4 +206,15 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
// first time use, load cache
checkDataset(df5, Row(10))
}
+
+ test("SPARK-24850 InMemoryRelation string representation does not include cached plan") {
+ val df = Seq(1).toDF("a").cache()
+ val outputStream = new java.io.ByteArrayOutputStream()
+ Console.withOut(outputStream) {
+ df.explain(false)
+ }
+ assert(outputStream.toString.replaceAll("#\\d+", "#x").contains(
+ "InMemoryRelation [a#x], StorageLevel(disk, memory, deserialized, 1 replicas)"
+ ))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org