You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Terry Kim (Jira)" <ji...@apache.org> on 2020/03/07 00:47:00 UTC
[jira] [Updated] (SPARK-31078) outputOrdering should handle aliases
correctly
[ https://issues.apache.org/jira/browse/SPARK-31078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Terry Kim updated SPARK-31078:
------------------------------
Description:
Currently, `outputOrdering` doesn't respect aliases. Thus, the following would produce an unnecessary sort node:
{code:java}
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
val df = (0 until 20).toDF("i").as("df")
df.repartition(8, df("i")).write.format("parquet")
.bucketBy(8, "i").sortBy("i").saveAsTable("t")
val t1 = spark.table("t")
val t2 = t1.selectExpr("i as ii")
t1.join(t2, t1("i") === t2("ii")).explain
}
{code}
would produce an unnecessary sort node:
{code:java}
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
: +- *(1) Filter isnotnull(i#8)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0
+- *(2) Project [i#8 AS ii#10]
+- *(2) Filter isnotnull(i#8)
+- *(2) ColumnarToRow
+- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
{code}
was:
Currently, `outputOrdering` doesn't respect aliases. Thus, the following would produce an unnecessary sort node:
{code:java}
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
val df = (0 until 20).toDF("i").as("df")
df.repartition(8, df("i")).write.format("parquet")
.bucketBy(8, "i").sortBy("i").saveAsTable("t")
val t1 = spark.table("t")
val t2 = t1.selectExpr("i as ii")
t1.join(t2, t1("i") === t2("ii")).explain
}
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
: +- *(1) Filter isnotnull(i#8)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0
+- *(2) Project [i#8 AS ii#10]
+- *(2) Filter isnotnull(i#8)
+- *(2) ColumnarToRow
+- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
{code}
> outputOrdering should handle aliases correctly
> ----------------------------------------------
>
> Key: SPARK-31078
> URL: https://issues.apache.org/jira/browse/SPARK-31078
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Terry Kim
> Priority: Major
>
> Currently, `outputOrdering` doesn't respect aliases. Thus, the following would produce an unnecessary sort node:
> {code:java}
> withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
> val df = (0 until 20).toDF("i").as("df")
> df.repartition(8, df("i")).write.format("parquet")
> .bucketBy(8, "i").sortBy("i").saveAsTable("t")
> val t1 = spark.table("t")
> val t2 = t1.selectExpr("i as ii")
> t1.join(t2, t1("i") === t2("ii")).explain
> }
> {code}
> would produce an unnecessary sort node:
> {code:java}
> == Physical Plan ==
> *(3) SortMergeJoin [i#8], [ii#10], Inner
> :- *(1) Project [i#8]
> : +- *(1) Filter isnotnull(i#8)
> : +- *(1) ColumnarToRow
> : +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
> +- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0
> +- *(2) Project [i#8 AS ii#10]
> +- *(2) Filter isnotnull(i#8)
> +- *(2) ColumnarToRow
> +- FileScan parquet default.t[i#8] Batched: true, DataFilters: [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org