You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/02 06:00:20 UTC

spark git commit: [SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files

Repository: spark
Updated Branches:
  refs/heads/master 1bbf9ff63 -> 620da3b48


[SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files

## What changes were proposed in this pull request?

When the metadata logs for various parts of Structured Streaming are stored on non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves hidden HDFS-style checksum (CRC) files in the log directory, one file per batch. This PR modifies HDFSMetadataLog so that it detects the use of a filesystem that doesn't use CRC files and removes the CRC files.
## How was this patch tested?

Modified an existing test case in HDFSMetadataLogSuite to check whether HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem.  Ran the entire regression suite.

Author: frreiss <fr...@us.ibm.com>

Closes #15027 from frreiss/fred-17475.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/620da3b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/620da3b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/620da3b4

Branch: refs/heads/master
Commit: 620da3b4828b3580c7ed7339b2a07938e6be1bb1
Parents: 1bbf9ff
Author: frreiss <fr...@us.ibm.com>
Authored: Tue Nov 1 23:00:17 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Nov 1 23:00:17 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 5 +++++
 .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala   | 6 ++++++
 2 files changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/620da3b4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index c7235320..9a0f87c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -148,6 +148,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
           // It will fail if there is an existing file (someone has committed the batch)
           logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
           fileManager.rename(tempPath, batchIdToPath(batchId))
+
+          // SPARK-17475: HDFSMetadataLog should not leak CRC files
+          // If the underlying filesystem didn't rename the CRC file, delete it.
+          val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
+          if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
           return
         } catch {
           case e: IOException if isFileAlreadyExistsException(e) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/620da3b4/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 9c1d26d..d03e08d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -119,6 +119,12 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
       assert(metadataLog.get(1).isEmpty)
       assert(metadataLog.get(2).isDefined)
       assert(metadataLog.getLatest().get._1 == 2)
+
+      // There should be exactly one file, called "2", in the metadata directory.
+      // This check also tests for regressions of SPARK-17475
+      val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq
+      assert(allFiles.size == 1)
+      assert(allFiles(0).getName() == "2")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org