You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by attilapiros <gi...@git.apache.org> on 2018/08/30 14:55:28 UTC

[GitHub] spark pull request #20958: [SPARK-23844][SS] Fix socket source honors recove...

Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20958#discussion_r214063180
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala ---
    @@ -256,6 +257,58 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
         }
       }
     
    +  test("fail on recovery - true") {
    +    val checkpointDir = Files.createTempDirectory("checkpoint").toFile.getAbsolutePath
    +    runSocketStream(checkpointDir, true, Seq(("hello", "hello"), ("world", "world")))
    +
    +    // Rerun socket stream will throw an exception, because it wrongly honors the recovered offsets.
    +    val exception = intercept[StreamingQueryException](
    +      runSocketStream(checkpointDir, true, Seq(("hello", "hello"), ("world", "world"))))
    +    assert(exception.getMessage.contains(
    +      "terminated with exception: Offsets committed out of order"))
    +  }
    +
    +  test("fail on recovery - false") {
    +    val checkpointDir = Files.createTempDirectory("checkpoint").toFile.getAbsolutePath
    +    runSocketStream(checkpointDir, false, Seq(("hello", "hello"), ("world", "world")))
    +
    +    // Rerun socket stream will not throw exception
    +    runSocketStream(checkpointDir, false, Seq(("hello", "hello"), ("world", "world")))
    +  }
    +
    +  private def runSocketStream(
    +      chkpointDir: String,
    +      failOnRecovery: Boolean,
    +      inputAndRets: Seq[(String, String)]): Unit = withSQLConf(
    +    "spark.sql.streaming.unsupportedOperationCheck" -> "false") {
    +      serverThread = new ServerThread()
    +      serverThread.start()
    +
    +      try {
    +        val ref = spark
    +        import ref.implicits._
    +        val socket = spark
    +          .readStream
    +          .format("socket")
    +          .options(Map("host" -> "localhost", "port" -> serverThread.port.toString))
    +          .option("failonrecovery", failOnRecovery.toString)
    +          .load()
    +          .as[String]
    +
    +        val actions = Seq(StartStream(checkpointLocation = chkpointDir)) ++
    +          inputAndRets.flatMap {
    +            case (input, ret) => Seq(AddSocketData(input), CheckLastBatch(ret))
    +          } ++ Seq(StopStream)
    +        testStream(socket)(actions : _*)
    +      } finally {
    +        if (serverThread != null) {
    +          serverThread.interrupt()
    +          serverThread.join()
    +          serverThread = null
    --- End diff --
    
    Nit: assignment not needed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org