You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/20 22:19:38 UTC
spark git commit: [SPARK-18927][SS] MemorySink for
StructuredStreaming can't recover from checkpoint if location is provided in
SessionConf
Repository: spark
Updated Branches:
refs/heads/master 95c95b71e -> caed89321
[SPARK-18927][SS] MemorySink for StructuredStreaming can't recover from checkpoint if location is provided in SessionConf
## What changes were proposed in this pull request?
Checkpoint Location can be defined for a StructuredStreaming on a per-query basis by the `DataStreamWriter` options, but it can also be provided through SparkSession configurations. It should be able to recover in both cases when the OutputMode is Complete for MemorySinks.
## How was this patch tested?
Unit tests
Author: Burak Yavuz <br...@gmail.com>
Closes #16342 from brkyvz/chk-rec.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/caed8932
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/caed8932
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/caed8932
Branch: refs/heads/master
Commit: caed89321fdabe83e46451ca4e968f86481ad500
Parents: 95c95b7
Author: Burak Yavuz <br...@gmail.com>
Authored: Tue Dec 20 14:19:35 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Dec 20 14:19:35 2016 -0800
----------------------------------------------------------------------
.../spark/sql/streaming/DataStreamWriter.scala | 2 +-
.../test/DataStreamReaderWriterSuite.scala | 32 +++++++++++++++-----
2 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/caed8932/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b3c600a..b7fc336 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -223,7 +223,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
val sink = new MemorySink(df.schema, outputMode)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
val chkpointLoc = extraOptions.get("checkpointLocation")
- val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete()
+ val recoverFromChkpoint = outputMode == OutputMode.Complete()
val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
chkpointLoc,
http://git-wip-us.apache.org/repos/asf/spark/blob/caed8932/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index acac0bf..9de3da3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -470,24 +470,22 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
sq.stop()
}
- test("MemorySink can recover from a checkpoint in Complete Mode") {
+ private def testMemorySinkCheckpointRecovery(chkLoc: String, provideInWriter: Boolean): Unit = {
import testImplicits._
val ms = new MemoryStream[Int](0, sqlContext)
val df = ms.toDF().toDF("a")
- val checkpointLoc = newMetadataDir
- val checkpointDir = new File(checkpointLoc, "offsets")
- checkpointDir.mkdirs()
- assert(checkpointDir.exists())
val tableName = "test"
def startQuery: StreamingQuery = {
- df.groupBy("a")
+ val writer = df.groupBy("a")
.count()
.writeStream
.format("memory")
.queryName(tableName)
- .option("checkpointLocation", checkpointLoc)
.outputMode("complete")
- .start()
+ if (provideInWriter) {
+ writer.option("checkpointLocation", chkLoc)
+ }
+ writer.start()
}
// no exception here
val q = startQuery
@@ -513,6 +511,24 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
q2.stop()
}
+ test("MemorySink can recover from a checkpoint in Complete Mode") {
+ val checkpointLoc = newMetadataDir
+ val checkpointDir = new File(checkpointLoc, "offsets")
+ checkpointDir.mkdirs()
+ assert(checkpointDir.exists())
+ testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = true)
+ }
+
+ test("SPARK-18927: MemorySink can recover from a checkpoint provided in conf in Complete Mode") {
+ val checkpointLoc = newMetadataDir
+ val checkpointDir = new File(checkpointLoc, "offsets")
+ checkpointDir.mkdirs()
+ assert(checkpointDir.exists())
+ withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointLoc) {
+ testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = false)
+ }
+ }
+
test("append mode memory sink's do not support checkpoint recovery") {
import testImplicits._
val ms = new MemoryStream[Int](0, sqlContext)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org