You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/10/30 12:17:56 UTC
spark git commit: [SPARK-11393] [SQL] CoGroupedIterator should
respect the fact that GroupedIterator.hasNext is not idempotent
Repository: spark
Updated Branches:
refs/heads/master 59db9e9c3 -> 14d08b990
[SPARK-11393] [SQL] CoGroupedIterator should respect the fact that GroupedIterator.hasNext is not idempotent
When we cogroup 2 `GroupedIterator`s in `CoGroupedIterator`, if the right side is smaller, we will consume right data and keep the left data unchanged. Then we call `hasNext` which will call `left.hasNext`. This will make `GroupedIterator` generate an extra group as the previous one has not been comsumed yet.
Author: Wenchen Fan <we...@databricks.com>
Closes #9346 from cloud-fan/cogroup and squashes the following commits:
9be67c8 [Wenchen Fan] SPARK-11393
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14d08b99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14d08b99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14d08b99
Branch: refs/heads/master
Commit: 14d08b99085d4e609aeae0cf54d4584e860eb552
Parents: 59db9e9
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Oct 30 12:17:51 2015 +0100
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Oct 30 12:17:51 2015 +0100
----------------------------------------------------------------------
.../spark/sql/execution/CoGroupedIterator.scala | 14 +++++++-----
.../sql/execution/CoGroupedIteratorSuite.scala | 24 ++++++++++++++++++++
2 files changed, 32 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/14d08b99/sql/core/src/main/scala/org/apache/spark/sql/execution/CoGroupedIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CoGroupedIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CoGroupedIterator.scala
index ce58278..663bc90 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CoGroupedIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CoGroupedIterator.scala
@@ -38,17 +38,19 @@ class CoGroupedIterator(
private var currentLeftData: (InternalRow, Iterator[InternalRow]) = _
private var currentRightData: (InternalRow, Iterator[InternalRow]) = _
- override def hasNext: Boolean = left.hasNext || right.hasNext
-
- override def next(): (InternalRow, Iterator[InternalRow], Iterator[InternalRow]) = {
- if (currentLeftData.eq(null) && left.hasNext) {
+ override def hasNext: Boolean = {
+ if (currentLeftData == null && left.hasNext) {
currentLeftData = left.next()
}
- if (currentRightData.eq(null) && right.hasNext) {
+ if (currentRightData == null && right.hasNext) {
currentRightData = right.next()
}
- assert(currentLeftData.ne(null) || currentRightData.ne(null))
+ currentLeftData != null || currentRightData != null
+ }
+
+ override def next(): (InternalRow, Iterator[InternalRow], Iterator[InternalRow]) = {
+ assert(hasNext)
if (currentLeftData.eq(null)) {
// left is null, right is not null, consume the right data.
http://git-wip-us.apache.org/repos/asf/spark/blob/14d08b99/sql/core/src/test/scala/org/apache/spark/sql/execution/CoGroupedIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoGroupedIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoGroupedIteratorSuite.scala
index d1fe819..4ff96e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoGroupedIteratorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoGroupedIteratorSuite.scala
@@ -48,4 +48,28 @@ class CoGroupedIteratorSuite extends SparkFunSuite with ExpressionEvalHelper {
Nil
)
}
+
+ test("SPARK-11393: respect the fact that GroupedIterator.hasNext is not idempotent") {
+ val leftInput = Seq(create_row(2, "a")).iterator
+ val rightInput = Seq(create_row(1, 2L)).iterator
+ val leftGrouped = GroupedIterator(leftInput, Seq('i.int.at(0)), Seq('i.int, 's.string))
+ val rightGrouped = GroupedIterator(rightInput, Seq('i.int.at(0)), Seq('i.int, 'l.long))
+ val cogrouped = new CoGroupedIterator(leftGrouped, rightGrouped, Seq('i.int))
+
+ val result = cogrouped.map {
+ case (key, leftData, rightData) =>
+ assert(key.numFields == 1)
+ (key.getInt(0), leftData.toSeq, rightData.toSeq)
+ }.toSeq
+
+ assert(result ==
+ (1,
+ Seq.empty,
+ Seq(create_row(1, 2L))) ::
+ (2,
+ Seq(create_row(2, "a")),
+ Seq.empty) ::
+ Nil
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org