You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/20 22:19:38 UTC

spark git commit: [SPARK-18927][SS] MemorySink for StructuredStreaming can't recover from checkpoint if location is provided in SessionConf

Repository: spark
Updated Branches:
  refs/heads/master 95c95b71e -> caed89321


[SPARK-18927][SS] MemorySink for StructuredStreaming can't recover from checkpoint if location is provided in SessionConf

## What changes were proposed in this pull request?

Checkpoint Location can be defined for a StructuredStreaming on a per-query basis by the `DataStreamWriter` options, but it can also be provided through SparkSession configurations. It should be able to recover in both cases when the OutputMode is Complete for MemorySinks.

## How was this patch tested?

Unit tests

Author: Burak Yavuz <br...@gmail.com>

Closes #16342 from brkyvz/chk-rec.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/caed8932
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/caed8932
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/caed8932

Branch: refs/heads/master
Commit: caed89321fdabe83e46451ca4e968f86481ad500
Parents: 95c95b7
Author: Burak Yavuz <br...@gmail.com>
Authored: Tue Dec 20 14:19:35 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Dec 20 14:19:35 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/streaming/DataStreamWriter.scala  |  2 +-
 .../test/DataStreamReaderWriterSuite.scala      | 32 +++++++++++++++-----
 2 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/caed8932/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b3c600a..b7fc336 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -223,7 +223,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
       val sink = new MemorySink(df.schema, outputMode)
       val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
       val chkpointLoc = extraOptions.get("checkpointLocation")
-      val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete()
+      val recoverFromChkpoint = outputMode == OutputMode.Complete()
       val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
         extraOptions.get("queryName"),
         chkpointLoc,

http://git-wip-us.apache.org/repos/asf/spark/blob/caed8932/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index acac0bf..9de3da3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -470,24 +470,22 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
     sq.stop()
   }
 
-  test("MemorySink can recover from a checkpoint in Complete Mode") {
+  private def testMemorySinkCheckpointRecovery(chkLoc: String, provideInWriter: Boolean): Unit = {
     import testImplicits._
     val ms = new MemoryStream[Int](0, sqlContext)
     val df = ms.toDF().toDF("a")
-    val checkpointLoc = newMetadataDir
-    val checkpointDir = new File(checkpointLoc, "offsets")
-    checkpointDir.mkdirs()
-    assert(checkpointDir.exists())
     val tableName = "test"
     def startQuery: StreamingQuery = {
-      df.groupBy("a")
+      val writer = df.groupBy("a")
         .count()
         .writeStream
         .format("memory")
         .queryName(tableName)
-        .option("checkpointLocation", checkpointLoc)
         .outputMode("complete")
-        .start()
+      if (provideInWriter) {
+        writer.option("checkpointLocation", chkLoc)
+      }
+      writer.start()
     }
     // no exception here
     val q = startQuery
@@ -513,6 +511,24 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
     q2.stop()
   }
 
+  test("MemorySink can recover from a checkpoint in Complete Mode") {
+    val checkpointLoc = newMetadataDir
+    val checkpointDir = new File(checkpointLoc, "offsets")
+    checkpointDir.mkdirs()
+    assert(checkpointDir.exists())
+    testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = true)
+  }
+
+  test("SPARK-18927: MemorySink can recover from a checkpoint provided in conf in Complete Mode") {
+    val checkpointLoc = newMetadataDir
+    val checkpointDir = new File(checkpointLoc, "offsets")
+    checkpointDir.mkdirs()
+    assert(checkpointDir.exists())
+    withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointLoc) {
+      testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = false)
+    }
+  }
+
   test("append mode memory sink's do not support checkpoint recovery") {
     import testImplicits._
     val ms = new MemoryStream[Int](0, sqlContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org