You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/06/30 08:14:43 UTC

[spark] branch branch-3.0 updated: [SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 22e3433  [SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory
22e3433 is described below

commit 22e34336da50220073d83768903726e619489942
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Tue Jun 30 08:09:18 2020 +0000

    [SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory
    
    ### What changes were proposed in this pull request?
    
    This patch fixes the missed spot - the test initializes FileStreamSinkLog with its "output" directory instead of "metadata" directory, hence the verification against sink log was no-op.
    
    ### Why are the changes needed?
    
    Without the fix, the verification against sink log was no-op.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Checked with debugger in test, and verified `allFiles()` returns non-zero entries. (It returned zero entry, as there's no metadata.)
    
    Closes #28930 from HeartSaVioR/SPARK-29999-FOLLOWUP-fix-test.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 5472170a2b35864c617bdb846ff7123533765a16)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/streaming/FileStreamSink.scala      | 19 +++++++++++--------
 .../spark/sql/streaming/FileStreamSinkSuite.scala     | 10 ++++++----
 2 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index b679f16..86a3194 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -45,8 +45,7 @@ object FileStreamSink extends Logging {
         val hdfsPath = new Path(singlePath)
         val fs = hdfsPath.getFileSystem(hadoopConf)
         if (fs.isDirectory(hdfsPath)) {
-          val metadataPath = new Path(hdfsPath, metadataDir)
-          checkEscapedMetadataPath(fs, metadataPath, sqlConf)
+          val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
           fs.exists(metadataPath)
         } else {
           false
@@ -55,6 +54,12 @@ object FileStreamSink extends Logging {
     }
   }
 
+  def getMetadataLogPath(fs: FileSystem, path: Path, sqlConf: SQLConf): Path = {
+    val metadataDir = new Path(path, FileStreamSink.metadataDir)
+    FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sqlConf)
+    metadataDir
+  }
+
   def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = {
     if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
         && StreamExecution.containsSpecialCharsInPath(metadataPath)) {
@@ -125,14 +130,12 @@ class FileStreamSink(
     partitionColumnNames: Seq[String],
     options: Map[String, String]) extends Sink with Logging {
 
+  import FileStreamSink._
+
   private val hadoopConf = sparkSession.sessionState.newHadoopConf()
   private val basePath = new Path(path)
-  private val logPath = {
-    val metadataDir = new Path(basePath, FileStreamSink.metadataDir)
-    val fs = metadataDir.getFileSystem(hadoopConf)
-    FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf)
-    metadataDir
-  }
+  private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
+    sparkSession.sessionState.conf)
   private val fileLog =
     new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 8779651..aa2664c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -555,10 +555,12 @@ abstract class FileStreamSinkSuite extends StreamTest {
             }
           }
 
-          val fs = new Path(outputDir.getCanonicalPath).getFileSystem(
-            spark.sessionState.newHadoopConf())
-          val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
-            outputDir.getCanonicalPath)
+          val outputDirPath = new Path(outputDir.getCanonicalPath)
+          val hadoopConf = spark.sessionState.newHadoopConf()
+          val fs = outputDirPath.getFileSystem(hadoopConf)
+          val logPath = FileStreamSink.getMetadataLogPath(fs, outputDirPath, conf)
+
+          val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toString)
 
           val allFiles = sinkLog.allFiles()
           // only files from non-empty partition should be logged


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org