You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/10/30 12:17:56 UTC

spark git commit: [SPARK-11393] [SQL] CoGroupedIterator should respect the fact that GroupedIterator.hasNext is not idempotent

Repository: spark
Updated Branches:
  refs/heads/master 59db9e9c3 -> 14d08b990


[SPARK-11393] [SQL] CoGroupedIterator should respect the fact that GroupedIterator.hasNext is not idempotent

When we cogroup 2 `GroupedIterator`s in `CoGroupedIterator`, if the right side is smaller, we will consume right data and keep the left data unchanged. Then we call `hasNext` which will call `left.hasNext`. This will make `GroupedIterator` generate an extra group as the previous one has not been comsumed yet.

Author: Wenchen Fan <we...@databricks.com>

Closes #9346 from cloud-fan/cogroup and squashes the following commits:

9be67c8 [Wenchen Fan] SPARK-11393


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14d08b99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14d08b99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14d08b99

Branch: refs/heads/master
Commit: 14d08b99085d4e609aeae0cf54d4584e860eb552
Parents: 59db9e9
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Oct 30 12:17:51 2015 +0100
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Oct 30 12:17:51 2015 +0100

----------------------------------------------------------------------
 .../spark/sql/execution/CoGroupedIterator.scala | 14 +++++++-----
 .../sql/execution/CoGroupedIteratorSuite.scala  | 24 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14d08b99/sql/core/src/main/scala/org/apache/spark/sql/execution/CoGroupedIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CoGroupedIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CoGroupedIterator.scala
index ce58278..663bc90 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CoGroupedIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CoGroupedIterator.scala
@@ -38,17 +38,19 @@ class CoGroupedIterator(
   private var currentLeftData: (InternalRow, Iterator[InternalRow]) = _
   private var currentRightData: (InternalRow, Iterator[InternalRow]) = _
 
-  override def hasNext: Boolean = left.hasNext || right.hasNext
-
-  override def next(): (InternalRow, Iterator[InternalRow], Iterator[InternalRow]) = {
-    if (currentLeftData.eq(null) && left.hasNext) {
+  override def hasNext: Boolean = {
+    if (currentLeftData == null && left.hasNext) {
       currentLeftData = left.next()
     }
-    if (currentRightData.eq(null) && right.hasNext) {
+    if (currentRightData == null && right.hasNext) {
       currentRightData = right.next()
     }
 
-    assert(currentLeftData.ne(null) || currentRightData.ne(null))
+    currentLeftData != null || currentRightData != null
+  }
+
+  override def next(): (InternalRow, Iterator[InternalRow], Iterator[InternalRow]) = {
+    assert(hasNext)
 
     if (currentLeftData.eq(null)) {
       // left is null, right is not null, consume the right data.

http://git-wip-us.apache.org/repos/asf/spark/blob/14d08b99/sql/core/src/test/scala/org/apache/spark/sql/execution/CoGroupedIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoGroupedIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoGroupedIteratorSuite.scala
index d1fe819..4ff96e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoGroupedIteratorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoGroupedIteratorSuite.scala
@@ -48,4 +48,28 @@ class CoGroupedIteratorSuite extends SparkFunSuite with ExpressionEvalHelper {
       Nil
     )
   }
+
+  test("SPARK-11393: respect the fact that GroupedIterator.hasNext is not idempotent") {
+    val leftInput = Seq(create_row(2, "a")).iterator
+    val rightInput = Seq(create_row(1, 2L)).iterator
+    val leftGrouped = GroupedIterator(leftInput, Seq('i.int.at(0)), Seq('i.int, 's.string))
+    val rightGrouped = GroupedIterator(rightInput, Seq('i.int.at(0)), Seq('i.int, 'l.long))
+    val cogrouped = new CoGroupedIterator(leftGrouped, rightGrouped, Seq('i.int))
+
+    val result = cogrouped.map {
+      case (key, leftData, rightData) =>
+        assert(key.numFields == 1)
+        (key.getInt(0), leftData.toSeq, rightData.toSeq)
+    }.toSeq
+
+    assert(result ==
+      (1,
+        Seq.empty,
+        Seq(create_row(1, 2L))) ::
+      (2,
+        Seq(create_row(2, "a")),
+        Seq.empty) ::
+      Nil
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org