You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/02/20 18:36:45 UTC

[GitHub] brkyvz commented on a change in pull request #23733: [SPARK-26824][SS]Fix the checkpoint location and _spark_metadata when it contains special chars

brkyvz commented on a change in pull request #23733: [SPARK-26824][SS]Fix the checkpoint location and _spark_metadata when it contains special chars
URL: https://github.com/apache/spark/pull/23733#discussion_r258620475
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ##########
 @@ -915,6 +918,189 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     )
   }
 
+  test("special characters in checkpoint path") {
+    withTempDir { tempDir =>
+      val checkpointDir = new File(tempDir, "chk @#chk")
+      val inputData = MemoryStream[Int]
+      inputData.addData(1)
+      val q = inputData.toDF()
+        .writeStream
+        .format("noop")
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .start()
+      try {
+        q.processAllAvailable()
+        assert(checkpointDir.listFiles().toList.nonEmpty)
+      } finally {
+        q.stop()
+      }
+    }
+  }
+
+  /**
+   * Copy the checkpoint generated by Spark 2.4.0 from test resource to `dir` to set up a legacy
+   * streaming checkpoint.
+   */
+  private def setUp2dot4dot0Checkpoint(dir: File): Unit = {
+    val input = getClass.getResource("/structured-streaming/escaped-path-2.4.0")
+    assert(input != null, "cannot find test resource '/structured-streaming/escaped-path-2.4.0'")
+    val inputDir = new File(input.toURI)
+
+    // Copy test files to tempDir so that we won't modify the original data.
+    FileUtils.copyDirectory(inputDir, dir)
+
+    // Spark 2.4 and earlier escaped the _spark_metadata path once
+    val legacySparkMetadataDir = new File(
+      dir,
+      new Path("output %@#output/_spark_metadata").toUri.toString)
+
+    // Migrate from legacy _spark_metadata directory to the new _spark_metadata directory.
+    // Ideally we should copy "_spark_metadata" directly like what the user is supposed to do to
+    // migrate to new version. However, in our test, "tempDir" will be different in each run and
+    // we need to fix the absolute path in the metadata to match "tempDir".
+    val sparkMetadata = FileUtils.readFileToString(new File(legacySparkMetadataDir, "0"), "UTF-8")
+    FileUtils.write(
+      new File(legacySparkMetadataDir, "0"),
+      sparkMetadata.replaceAll("TEMPDIR", dir.getCanonicalPath),
+      "UTF-8")
+  }
+
+  test("detect escaped path and report the migration guide") {
+    // Assert that the error message contains the migration conf, path and the legacy path.
+    def assertMigrationError(errorMessage: String, path: File, legacyPath: File): Unit = {
+      Seq(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key,
+          path.getCanonicalPath,
+          legacyPath.getCanonicalPath).foreach { msg =>
+        assert(errorMessage.contains(msg))
+      }
+    }
+
+    withTempDir { tempDir =>
+      setUp2dot4dot0Checkpoint(tempDir)
+
+      // Here are the paths we will use to create the query
+      val outputDir = new File(tempDir, "output %@#output")
+      val checkpointDir = new File(tempDir, "chk %@#chk")
+      val sparkMetadataDir = new File(tempDir, "output %@#output/_spark_metadata")
+
+      // The escaped paths used by Spark 2.4 and earlier.
+      // Spark 2.4 and earlier escaped the checkpoint path three times
+      val legacyCheckpointDir = new File(
+        tempDir,
+        new Path(new Path(new Path("chk %@#chk").toUri.toString).toUri.toString).toUri.toString)
+      // Spark 2.4 and earlier escaped the _spark_metadata path once
+      val legacySparkMetadataDir = new File(
+        tempDir,
+        new Path("output %@#output/_spark_metadata").toUri.toString)
+
+      // Reading a file sink output in a batch query should detect the legacy _spark_metadata
+      // directory and throw an error
+      val e = intercept[SparkException] {
+        spark.read.load(outputDir.getCanonicalPath).as[Int]
+      }
+      assertMigrationError(e.getMessage, sparkMetadataDir, legacySparkMetadataDir)
+
+      // Restarting the streaming query should detect the legacy _spark_metadata directory and throw
+      // an error
+      val inputData = MemoryStream[Int]
+      val e2 = intercept[SparkException] {
+        inputData.toDF()
+          .writeStream
+          .format("parquet")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .start(outputDir.getCanonicalPath)
+      }
+      assertMigrationError(e2.getMessage, sparkMetadataDir, legacySparkMetadataDir)
+
+      // Move "_spark_metadata" to fix the file sink and test the checkpoint path.
+      FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir)
+
+      // Restarting the streaming query should detect the legacy checkpoint path and throw an error
+      val e3 = intercept[SparkException] {
+        inputData.toDF()
+          .writeStream
+          .format("parquet")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .start(outputDir.getCanonicalPath)
+      }
+      assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir)
+
+      // Fix the checkpoint path and verify that the user can migrate the issue by moving files.
+      FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir)
+
+      val q = inputData.toDF()
+        .writeStream
+        .format("parquet")
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .start(outputDir.getCanonicalPath)
+      try {
+        q.processAllAvailable()
+        // Check the query id to make sure it did use checkpoint
+        assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898")
+
+        // Verify that the batch query can read "_spark_metadata" correctly after migration.
+        val df = spark.read.load(outputDir.getCanonicalPath)
+        assert(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex")
+        checkDatasetUnorderly(df.as[Int], 1, 2, 3)
+      } finally {
+        q.stop()
+      }
+    }
+  }
+
+  test("ignore the escaped path check when the flag is off") {
+    withTempDir { tempDir =>
+      setUp2dot4dot0Checkpoint(tempDir)
+      val outputDir = new File(tempDir, "output %@#output")
+      val checkpointDir = new File(tempDir, "chk %@#chk")
+
+      withSQLConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key -> "false") {
+        // Verify that the batch query ignores the legacy "_spark_metadata"
+        val df = spark.read.load(outputDir.getCanonicalPath)
+        assert(!(df.queryExecution.executedPlan.toString contains "MetadataLogFileIndex"))
 
 Review comment:
   These are amazing tests 🥇 💯 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org