You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/11/15 21:09:44 UTC

spark git commit: [SPARK-18337] Complete mode memory sinks should be able to recover from checkpoints

Repository: spark
Updated Branches:
  refs/heads/master 6f9e598cc -> 2afdaa980


[SPARK-18337] Complete mode memory sinks should be able to recover from checkpoints

## What changes were proposed in this pull request?

It would be nice if memory sinks can also recover from checkpoints. For correctness reasons, the only time we should support it is in `Complete` OutputMode. We can support this in CompleteMode, because the output of the StateStore is already persisted in the checkpoint directory.

## How was this patch tested?

Unit test

Author: Burak Yavuz <br...@gmail.com>

Closes #15801 from brkyvz/mem-stream.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2afdaa98
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2afdaa98
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2afdaa98

Branch: refs/heads/master
Commit: 2afdaa9805f44b45242978eab9a9623d31dddbf3
Parents: 6f9e598
Author: Burak Yavuz <br...@gmail.com>
Authored: Tue Nov 15 13:09:29 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Nov 15 13:09:29 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/streaming/DataStreamWriter.scala  |  6 +-
 .../test/DataStreamReaderWriterSuite.scala      | 65 ++++++++++++++++++++
 2 files changed, 69 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2afdaa98/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b959444..daed1dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -222,14 +222,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
       val sink = new MemorySink(df.schema, outputMode)
       val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
+      val chkpointLoc = extraOptions.get("checkpointLocation")
+      val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete()
       val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
         extraOptions.get("queryName"),
-        extraOptions.get("checkpointLocation"),
+        chkpointLoc,
         df,
         sink,
         outputMode,
         useTempCheckpointLocation = true,
-        recoverFromCheckpointLocation = false,
+        recoverFromCheckpointLocation = recoverFromChkpoint,
         trigger = trigger)
       resultDf.createOrReplaceTempView(query.name)
       query

http://git-wip-us.apache.org/repos/asf/spark/blob/2afdaa98/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index f099439..5630464 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.streaming.test
 
+import java.io.File
 import java.util.concurrent.TimeUnit
 
 import scala.concurrent.duration._
@@ -467,4 +468,68 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
     val sq = df.writeStream.format("console").start()
     sq.stop()
   }
+
+  test("MemorySink can recover from a checkpoint in Complete Mode") {
+    import testImplicits._
+    val ms = new MemoryStream[Int](0, sqlContext)
+    val df = ms.toDF().toDF("a")
+    val checkpointLoc = newMetadataDir
+    val checkpointDir = new File(checkpointLoc, "offsets")
+    checkpointDir.mkdirs()
+    assert(checkpointDir.exists())
+    val tableName = "test"
+    def startQuery: StreamingQuery = {
+      df.groupBy("a")
+        .count()
+        .writeStream
+        .format("memory")
+        .queryName(tableName)
+        .option("checkpointLocation", checkpointLoc)
+        .outputMode("complete")
+        .start()
+    }
+    // no exception here
+    val q = startQuery
+    ms.addData(0, 1)
+    q.processAllAvailable()
+    q.stop()
+
+    checkAnswer(
+      spark.table(tableName),
+      Seq(Row(0, 1), Row(1, 1))
+    )
+    spark.sql(s"drop table $tableName")
+    // verify table is dropped
+    intercept[AnalysisException](spark.table(tableName).collect())
+    val q2 = startQuery
+    ms.addData(0)
+    q2.processAllAvailable()
+    checkAnswer(
+      spark.table(tableName),
+      Seq(Row(0, 2), Row(1, 1))
+    )
+
+    q2.stop()
+  }
+
+  test("append mode memory sink's do not support checkpoint recovery") {
+    import testImplicits._
+    val ms = new MemoryStream[Int](0, sqlContext)
+    val df = ms.toDF().toDF("a")
+    val checkpointLoc = newMetadataDir
+    val checkpointDir = new File(checkpointLoc, "offsets")
+    checkpointDir.mkdirs()
+    assert(checkpointDir.exists())
+
+    val e = intercept[AnalysisException] {
+      df.writeStream
+        .format("memory")
+        .queryName("test")
+        .option("checkpointLocation", checkpointLoc)
+        .outputMode("append")
+        .start()
+    }
+    assert(e.getMessage.contains("does not support recovering"))
+    assert(e.getMessage.contains("checkpoint location"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org