You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/11/04 19:03:30 UTC

[spark] branch branch-2.4 updated: [SPARK-29743][SQL] sample should set needCopyResult to true if its child is

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 19e55c3  [SPARK-29743][SQL] sample should set needCopyResult to true if its child is
19e55c3 is described below

commit 19e55c319c928d4b8d3b1fc9a20aa3e7d5798776
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Nov 4 10:56:37 2019 -0800

    [SPARK-29743][SQL] sample should set needCopyResult to true if its child is
    
    `SampleExec` has a bug that it sets `needCopyResult` to false as long as the `withReplacement` parameter is false. This causes problems if its child needs to copy the result, e.g. a join.
    
    to fix a correctness issue
    
    Yes, the result will be corrected.
    
    a new test
    
    Closes #26387 from cloud-fan/sample-bug.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 326b7893401b6bd57cf11f657386c0f9da00902a)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/execution/basicPhysicalOperators.scala    |  4 +++-
 .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala   | 10 ++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 4f86f3b..27aa9b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -280,7 +280,9 @@ case class SampleExec(
     child.asInstanceOf[CodegenSupport].produce(ctx, this)
   }
 
-  override def needCopyResult: Boolean = withReplacement
+  override def needCopyResult: Boolean = {
+    child.asInstanceOf[CodegenSupport].needCopyResult || withReplacement
+  }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
     val numOutput = metricTerm(ctx, "numOutputRows")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 3d74206..c8a03c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2631,4 +2631,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
     val cast = sql("SELECT cast(struct(1, null) AS struct<a:int,b:int>)")
     checkAnswer(cast, Row(Row(1, null)) :: Nil)
   }
+
+  test("sample should not duplicated the input data") {
+    val df1 = spark.range(10).select($"id" as "id1", $"id" % 5 as "key1")
+    val df2 = spark.range(10).select($"id" as "id2", $"id" % 5 as "key2")
+    val sampled = df1.join(df2, $"key1" === $"key2")
+      .sample(0.5, 42)
+      .select("id1", "id2")
+    val idTuples = sampled.collect().map(row => row.getLong(0) -> row.getLong(1))
+    assert(idTuples.length == idTuples.toSet.size)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org