You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/11/04 19:03:30 UTC
[spark] branch branch-2.4 updated: [SPARK-29743][SQL] sample should
set needCopyResult to true if its child is
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 19e55c3 [SPARK-29743][SQL] sample should set needCopyResult to true if its child is
19e55c3 is described below
commit 19e55c319c928d4b8d3b1fc9a20aa3e7d5798776
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Nov 4 10:56:37 2019 -0800
[SPARK-29743][SQL] sample should set needCopyResult to true if its child is
`SampleExec` has a bug that it sets `needCopyResult` to false as long as the `withReplacement` parameter is false. This causes problems if its child needs to copy the result, e.g. a join.
to fix a correctness issue
Yes, the result will be corrected.
a new test
Closes #26387 from cloud-fan/sample-bug.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit 326b7893401b6bd57cf11f657386c0f9da00902a)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../apache/spark/sql/execution/basicPhysicalOperators.scala | 4 +++-
.../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 10 ++++++++++
2 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 4f86f3b..27aa9b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -280,7 +280,9 @@ case class SampleExec(
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}
- override def needCopyResult: Boolean = withReplacement
+ override def needCopyResult: Boolean = {
+ child.asInstanceOf[CodegenSupport].needCopyResult || withReplacement
+ }
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
val numOutput = metricTerm(ctx, "numOutputRows")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 3d74206..c8a03c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2631,4 +2631,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val cast = sql("SELECT cast(struct(1, null) AS struct<a:int,b:int>)")
checkAnswer(cast, Row(Row(1, null)) :: Nil)
}
+
+ test("sample should not duplicated the input data") {
+ val df1 = spark.range(10).select($"id" as "id1", $"id" % 5 as "key1")
+ val df2 = spark.range(10).select($"id" as "id2", $"id" % 5 as "key2")
+ val sampled = df1.join(df2, $"key1" === $"key2")
+ .sample(0.5, 42)
+ .select("id1", "id2")
+ val idTuples = sampled.collect().map(row => row.getLong(0) -> row.getLong(1))
+ assert(idTuples.length == idTuples.toSet.size)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org