You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/06/30 08:14:43 UTC
[spark] branch branch-3.0 updated: [SPARK-29999][SS][FOLLOWUP] Fix
test to check the actual metadata log directory
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 22e3433 [SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory
22e3433 is described below
commit 22e34336da50220073d83768903726e619489942
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Tue Jun 30 08:09:18 2020 +0000
[SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory
### What changes were proposed in this pull request?
This patch fixes the missed spot - the test initializes FileStreamSinkLog with its "output" directory instead of "metadata" directory, hence the verification against sink log was no-op.
### Why are the changes needed?
Without the fix, the verification against sink log was no-op.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Checked with debugger in test, and verified `allFiles()` returns non-zero entries. (It returned zero entry, as there's no metadata.)
Closes #28930 from HeartSaVioR/SPARK-29999-FOLLOWUP-fix-test.
Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 5472170a2b35864c617bdb846ff7123533765a16)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/execution/streaming/FileStreamSink.scala | 19 +++++++++++--------
.../spark/sql/streaming/FileStreamSinkSuite.scala | 10 ++++++----
2 files changed, 17 insertions(+), 12 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index b679f16..86a3194 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -45,8 +45,7 @@ object FileStreamSink extends Logging {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.isDirectory(hdfsPath)) {
- val metadataPath = new Path(hdfsPath, metadataDir)
- checkEscapedMetadataPath(fs, metadataPath, sqlConf)
+ val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
fs.exists(metadataPath)
} else {
false
@@ -55,6 +54,12 @@ object FileStreamSink extends Logging {
}
}
+ def getMetadataLogPath(fs: FileSystem, path: Path, sqlConf: SQLConf): Path = {
+ val metadataDir = new Path(path, FileStreamSink.metadataDir)
+ FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sqlConf)
+ metadataDir
+ }
+
def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = {
if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
&& StreamExecution.containsSpecialCharsInPath(metadataPath)) {
@@ -125,14 +130,12 @@ class FileStreamSink(
partitionColumnNames: Seq[String],
options: Map[String, String]) extends Sink with Logging {
+ import FileStreamSink._
+
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
private val basePath = new Path(path)
- private val logPath = {
- val metadataDir = new Path(basePath, FileStreamSink.metadataDir)
- val fs = metadataDir.getFileSystem(hadoopConf)
- FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf)
- metadataDir
- }
+ private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
+ sparkSession.sessionState.conf)
private val fileLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 8779651..aa2664c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -555,10 +555,12 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
}
- val fs = new Path(outputDir.getCanonicalPath).getFileSystem(
- spark.sessionState.newHadoopConf())
- val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
- outputDir.getCanonicalPath)
+ val outputDirPath = new Path(outputDir.getCanonicalPath)
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val fs = outputDirPath.getFileSystem(hadoopConf)
+ val logPath = FileStreamSink.getMetadataLogPath(fs, outputDirPath, conf)
+
+ val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toString)
val allFiles = sinkLog.allFiles()
// only files from non-empty partition should be logged
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org