You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by attilapiros <gi...@git.apache.org> on 2018/08/30 14:55:28 UTC
[GitHub] spark pull request #20958: [SPARK-23844][SS] Fix socket source honors recove...
Github user attilapiros commented on a diff in the pull request:
https://github.com/apache/spark/pull/20958#discussion_r214063180
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
@@ -256,6 +257,58 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
}
}
+ test("fail on recovery - true") {
+ val checkpointDir = Files.createTempDirectory("checkpoint").toFile.getAbsolutePath
+ runSocketStream(checkpointDir, true, Seq(("hello", "hello"), ("world", "world")))
+
+ // Rerun socket stream will throw an exception, because it wrongly honors the recovered offsets.
+ val exception = intercept[StreamingQueryException](
+ runSocketStream(checkpointDir, true, Seq(("hello", "hello"), ("world", "world"))))
+ assert(exception.getMessage.contains(
+ "terminated with exception: Offsets committed out of order"))
+ }
+
+ test("fail on recovery - false") {
+ val checkpointDir = Files.createTempDirectory("checkpoint").toFile.getAbsolutePath
+ runSocketStream(checkpointDir, false, Seq(("hello", "hello"), ("world", "world")))
+
+ // Rerun socket stream will not throw exception
+ runSocketStream(checkpointDir, false, Seq(("hello", "hello"), ("world", "world")))
+ }
+
+ private def runSocketStream(
+ chkpointDir: String,
+ failOnRecovery: Boolean,
+ inputAndRets: Seq[(String, String)]): Unit = withSQLConf(
+ "spark.sql.streaming.unsupportedOperationCheck" -> "false") {
+ serverThread = new ServerThread()
+ serverThread.start()
+
+ try {
+ val ref = spark
+ import ref.implicits._
+ val socket = spark
+ .readStream
+ .format("socket")
+ .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
+ .option("failonrecovery", failOnRecovery.toString)
+ .load()
+ .as[String]
+
+ val actions = Seq(StartStream(checkpointLocation = chkpointDir)) ++
+ inputAndRets.flatMap {
+ case (input, ret) => Seq(AddSocketData(input), CheckLastBatch(ret))
+ } ++ Seq(StopStream)
+ testStream(socket)(actions : _*)
+ } finally {
+ if (serverThread != null) {
+ serverThread.interrupt()
+ serverThread.join()
+ serverThread = null
--- End diff --
Nit: assignment not needed
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org