You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/10/09 23:34:41 UTC

spark git commit: [SPARK-22230] Swap per-row order in state store restore.

Repository: spark
Updated Branches:
  refs/heads/master 155ab6347 -> 71c2b81aa


[SPARK-22230] Swap per-row order in state store restore.

## What changes were proposed in this pull request?
In state store restore, for each row, put the saved state before the row in the iterator instead of after.

This fixes an issue where agg(last('attr)) will forever return the last value of 'attr from the first microbatch.

## How was this patch tested?

new unit test

Author: Jose Torres <jo...@databricks.com>

Closes #19461 from joseph-torres/SPARK-22230.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71c2b81a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71c2b81a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71c2b81a

Branch: refs/heads/master
Commit: 71c2b81aa0e0db70013821f5512df1fbd8e59445
Parents: 155ab63
Author: Jose Torres <jo...@databricks.com>
Authored: Mon Oct 9 16:34:39 2017 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Mon Oct 9 16:34:39 2017 -0700

----------------------------------------------------------------------
 .../sql/execution/streaming/statefulOperators.scala |  2 +-
 .../sql/streaming/StreamingAggregationSuite.scala   | 16 ++++++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/71c2b81a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index fb960fb..0d85542 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -225,7 +225,7 @@ case class StateStoreRestoreExec(
             val key = getKey(row)
             val savedState = store.get(key)
             numOutputRows += 1
-            row +: Option(savedState).toSeq
+            Option(savedState).toSeq :+ row
           }
         }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/71c2b81a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 995cea3..fe7efa6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -520,6 +520,22 @@ class StreamingAggregationSuite extends StateStoreMetricsTest
     }
   }
 
+  test("SPARK-22230: last should change with new batches") {
+    val input = MemoryStream[Int]
+
+    val aggregated = input.toDF().agg(last('value))
+    testStream(aggregated, OutputMode.Complete())(
+      AddData(input, 1, 2, 3),
+      CheckLastBatch(3),
+      AddData(input, 4, 5, 6),
+      CheckLastBatch(6),
+      AddData(input),
+      CheckLastBatch(6),
+      AddData(input, 0),
+      CheckLastBatch(0)
+    )
+  }
+
   /** Add blocks of data to the `BlockRDDBackedSource`. */
   case class AddBlockData(source: BlockRDDBackedSource, data: Seq[Int]*) extends AddData {
     override def addData(query: Option[StreamExecution]): (Source, Offset) = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org