You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/03/26 04:07:59 UTC

spark git commit: [SPARK-14109][SQL] Fix HDFSMetadataLog to fallback from FileContext to FileSystem API

Repository: spark
Updated Branches:
  refs/heads/master 24587ce43 -> 13945dd83


[SPARK-14109][SQL] Fix HDFSMetadataLog to fallback from FileContext to FileSystem API

## What changes were proposed in this pull request?

HDFSMetadataLog uses newer FileContext API to achieve atomic renaming. However, FileContext implementations may not exist for many scheme for which there may be FileSystem implementations. In those cases, rather than failing completely, we should fallback to the FileSystem based implementation, and log warning that there may be file consistency issues in case the log directory is concurrently modified.

In addition I have also added more tests to increase the code coverage.

## How was this patch tested?

Unit test.
Tested on cluster with custom file system.

Author: Tathagata Das <ta...@gmail.com>

Closes #11925 from tdas/SPARK-14109.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13945dd8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13945dd8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13945dd8

Branch: refs/heads/master
Commit: 13945dd83bfa47ebd05181bda5a7c3e412feb5c0
Parents: 24587ce
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Mar 25 20:07:54 2016 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Mar 25 20:07:54 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/HDFSMetadataLog.scala   | 179 +++++++++++++++++--
 .../streaming/HDFSMetadataLogSuite.scala        | 122 ++++++++++++-
 .../spark/sql/test/SharedSQLContext.scala       |   5 +-
 .../apache/spark/sql/test/TestSQLContext.scala  |   8 +-
 4 files changed, 288 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/13945dd8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index f27d23b..9663fee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io.{FileNotFoundException, IOException}
 import java.nio.ByteBuffer
-import java.util.{ConcurrentModificationException, EnumSet}
+import java.util.{ConcurrentModificationException, EnumSet, UUID}
 
 import scala.reflect.ClassTag
 
 import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
 
@@ -32,6 +33,7 @@ import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.sql.SQLContext
 
+
 /**
  * A [[MetadataLog]] implementation based on HDFS. [[HDFSMetadataLog]] uses the specified `path`
  * as the metadata storage.
@@ -47,17 +49,13 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
   extends MetadataLog[T]
   with Logging {
 
-  private val metadataPath = new Path(path)
+  import HDFSMetadataLog._
 
-  private val fc =
-    if (metadataPath.toUri.getScheme == null) {
-      FileContext.getFileContext(sqlContext.sparkContext.hadoopConfiguration)
-    } else {
-      FileContext.getFileContext(metadataPath.toUri, sqlContext.sparkContext.hadoopConfiguration)
-    }
+  private val metadataPath = new Path(path)
+  private val fileManager = createFileManager()
 
-  if (!fc.util().exists(metadataPath)) {
-    fc.mkdir(metadataPath, FsPermission.getDirDefault, true)
+  if (!fileManager.exists(metadataPath)) {
+    fileManager.mkdirs(metadataPath)
   }
 
   /**
@@ -104,10 +102,9 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
     // Use nextId to create a temp file
     var nextId = 0
     while (true) {
-      val tempPath = new Path(metadataPath, s".${batchId}_$nextId.tmp")
-      fc.deleteOnExit(tempPath)
+      val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
       try {
-        val output = fc.create(tempPath, EnumSet.of(CreateFlag.CREATE))
+        val output = fileManager.create(tempPath)
         try {
           output.write(bytes)
         } finally {
@@ -117,7 +114,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
           // Try to commit the batch
           // It will fail if there is an existing file (someone has committed the batch)
           logDebug(s"Attempting to write log #${batchFile(batchId)}")
-          fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE)
+          fileManager.rename(tempPath, batchFile(batchId))
           return
         } catch {
           case e: IOException if isFileAlreadyExistsException(e) =>
@@ -147,6 +144,8 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
           // metadata path. In addition, the old Streaming also have this issue, people can create
           // malicious checkpoint files to crash a Streaming application too.
           nextId += 1
+      } finally {
+        fileManager.delete(tempPath)
       }
     }
   }
@@ -160,8 +159,8 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
 
   override def get(batchId: Long): Option[T] = {
     val batchMetadataFile = batchFile(batchId)
-    if (fc.util().exists(batchMetadataFile)) {
-      val input = fc.open(batchMetadataFile)
+    if (fileManager.exists(batchMetadataFile)) {
+      val input = fileManager.open(batchMetadataFile)
       val bytes = IOUtils.toByteArray(input)
       Some(serializer.deserialize[T](ByteBuffer.wrap(bytes)))
     } else {
@@ -171,7 +170,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
   }
 
   override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = {
-    val files = fc.util().listStatus(metadataPath, batchFilesFilter)
+    val files = fileManager.list(metadataPath, batchFilesFilter)
     val batchIds = files
       .map(_.getPath.getName.toLong)
       .filter { batchId =>
@@ -184,7 +183,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
   }
 
   override def getLatest(): Option[(Long, T)] = {
-    val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
       .map(_.getPath.getName.toLong)
       .sorted
       .reverse
@@ -196,4 +195,148 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
     }
     None
   }
+
+  private def createFileManager(): FileManager = {
+    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+    try {
+      new FileContextManager(metadataPath, hadoopConf)
+    } catch {
+      case e: UnsupportedFileSystemException =>
+        logWarning("Could not use FileContext API for managing metadata log file. The log may be" +
+          "inconsistent under failures.", e)
+        new FileSystemManager(metadataPath, hadoopConf)
+    }
+  }
+}
+
+object HDFSMetadataLog {
+
+  /** A simple trait to abstract out the file management operations needed by HDFSMetadataLog. */
+  trait FileManager {
+
+    /** List the files in a path that matches a filter. */
+    def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+    /** Make directory at the give path and all its parent directories as needed. */
+    def mkdirs(path: Path): Unit
+
+    /** Whether path exists */
+    def exists(path: Path): Boolean
+
+    /** Open a file for reading, or throw exception if it does not exist. */
+    def open(path: Path): FSDataInputStream
+
+    /** Create path, or throw exception if it already exists */
+    def create(path: Path): FSDataOutputStream
+
+    /**
+     * Atomically rename path, or throw exception if it cannot be done.
+     * Should throw FileNotFoundException if srcPath does not exist.
+     * Should throw FileAlreadyExistsException if destPath already exists.
+     */
+    def rename(srcPath: Path, destPath: Path): Unit
+
+    /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
+    def delete(path: Path): Unit
+  }
+
+  /**
+   * Default implementation of FileManager using newer FileContext API.
+   */
+  class FileContextManager(path: Path, hadoopConf: Configuration) extends FileManager {
+    private val fc = if (path.toUri.getScheme == null) {
+      FileContext.getFileContext(hadoopConf)
+    } else {
+      FileContext.getFileContext(path.toUri, hadoopConf)
+    }
+
+    override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
+      fc.util.listStatus(path, filter)
+    }
+
+    override def rename(srcPath: Path, destPath: Path): Unit = {
+      fc.rename(srcPath, destPath)
+    }
+
+    override def mkdirs(path: Path): Unit = {
+      fc.mkdir(path, FsPermission.getDirDefault, true)
+    }
+
+    override def open(path: Path): FSDataInputStream = {
+      fc.open(path)
+    }
+
+    override def create(path: Path): FSDataOutputStream = {
+      fc.create(path, EnumSet.of(CreateFlag.CREATE))
+    }
+
+    override def exists(path: Path): Boolean = {
+      fc.util().exists(path)
+    }
+
+    override def delete(path: Path): Unit = {
+      try {
+        fc.delete(path, true)
+      } catch {
+        case e: FileNotFoundException =>
+        // ignore if file has already been deleted
+      }
+    }
+  }
+
+  /**
+   * Implementation of FileManager using older FileSystem API. Note that this implementation
+   * cannot provide atomic renaming of paths, hence can lead to consistency issues. This
+   * should be used only as a backup option, when FileContextManager cannot be used.
+   */
+  class FileSystemManager(path: Path, hadoopConf: Configuration) extends FileManager {
+    private val fs = path.getFileSystem(hadoopConf)
+
+    override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
+      fs.listStatus(path, filter)
+    }
+
+    /**
+     * Rename a path. Note that this implementation is not atomic.
+     * @throws FileNotFoundException if source path does not exist.
+     * @throws FileAlreadyExistsException if destination path already exists.
+     * @throws IOException if renaming fails for some unknown reason.
+     */
+    override def rename(srcPath: Path, destPath: Path): Unit = {
+      if (!fs.exists(srcPath)) {
+        throw new FileNotFoundException(s"Source path does not exist: $srcPath")
+      }
+      if (fs.exists(destPath)) {
+        throw new FileAlreadyExistsException(s"Destination path already exists: $destPath")
+      }
+      if (!fs.rename(srcPath, destPath)) {
+        throw new IOException(s"Failed to rename $srcPath to $destPath")
+      }
+    }
+
+    override def mkdirs(path: Path): Unit = {
+      fs.mkdirs(path, FsPermission.getDirDefault)
+    }
+
+    override def open(path: Path): FSDataInputStream = {
+      fs.open(path)
+    }
+
+    override def create(path: Path): FSDataOutputStream = {
+      fs.create(path, false)
+    }
+
+    override def exists(path: Path): Boolean = {
+      fs.exists(path)
+    }
+
+    override def delete(path: Path): Unit = {
+      try {
+        fs.delete(path, true)
+      } catch {
+        case e: FileNotFoundException =>
+          // ignore if file has already been deleted
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/13945dd8/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 9ed5686..d5db9db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -17,21 +17,48 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{File, FileNotFoundException, IOException}
+import java.net.URI
 import java.util.ConcurrentModificationException
 
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
 import org.scalatest.concurrent.AsyncAssertions._
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.execution.streaming.FakeFileSystem._
+import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, FileManager, FileSystemManager}
 import org.apache.spark.sql.test.SharedSQLContext
 
 class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
 
+  /** To avoid caching of FS objects */
+  override protected val sparkConf =
+    new SparkConf().set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")
+
   private implicit def toOption[A](a: A): Option[A] = Option(a)
 
-  test("basic") {
+  test("FileManager: FileContextManager") {
     withTempDir { temp =>
-      val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+      val path = new Path(temp.getAbsolutePath)
+      testManager(path, new FileContextManager(path, new Configuration))
+    }
+  }
+
+  test("FileManager: FileSystemManager") {
+    withTempDir { temp =>
+      val path = new Path(temp.getAbsolutePath)
+      testManager(path, new FileSystemManager(path, new Configuration))
+    }
+  }
+
+  test("HDFSMetadataLog: basic") {
+    withTempDir { temp =>
+      val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
+      val metadataLog = new HDFSMetadataLog[String](sqlContext, dir.getAbsolutePath)
       assert(metadataLog.add(0, "batch0"))
       assert(metadataLog.getLatest() === Some(0 -> "batch0"))
       assert(metadataLog.get(0) === Some("batch0"))
@@ -53,7 +80,27 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
     }
   }
 
-  test("restart") {
+  testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
+    sqlContext.sparkContext.hadoopConfiguration.set(
+      s"fs.$scheme.impl",
+      classOf[FakeFileSystem].getName)
+    withTempDir { temp =>
+      val metadataLog = new HDFSMetadataLog[String](sqlContext, s"$scheme://$temp")
+      assert(metadataLog.add(0, "batch0"))
+      assert(metadataLog.getLatest() === Some(0 -> "batch0"))
+      assert(metadataLog.get(0) === Some("batch0"))
+      assert(metadataLog.get(None, 0) === Array(0 -> "batch0"))
+
+
+      val metadataLog2 = new HDFSMetadataLog[String](sqlContext, s"$scheme://$temp")
+      assert(metadataLog2.get(0) === Some("batch0"))
+      assert(metadataLog2.getLatest() === Some(0 -> "batch0"))
+      assert(metadataLog2.get(None, 0) === Array(0 -> "batch0"))
+
+    }
+  }
+
+  test("HDFSMetadataLog: restart") {
     withTempDir { temp =>
       val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
       assert(metadataLog.add(0, "batch0"))
@@ -71,7 +118,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
     }
   }
 
-  test("metadata directory collision") {
+  test("HDFSMetadataLog: metadata directory collision") {
     withTempDir { temp =>
       val waiter = new Waiter
       val maxBatchId = 100
@@ -102,4 +149,69 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
       assert(metadataLog.get(None, maxBatchId) === (0 to maxBatchId).map(i => (i, i.toString)))
     }
   }
+
+
+  def testManager(basePath: Path, fm: FileManager): Unit = {
+    // Mkdirs
+    val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+    assert(!fm.exists(dir))
+    fm.mkdirs(dir)
+    assert(fm.exists(dir))
+    fm.mkdirs(dir)
+
+    // List
+    val acceptAllFilter = new PathFilter {
+      override def accept(path: Path): Boolean = true
+    }
+    val rejectAllFilter = new PathFilter {
+      override def accept(path: Path): Boolean = false
+    }
+    assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == "dir"))
+    assert(fm.list(basePath, rejectAllFilter).length === 0)
+
+    // Create
+    val path = new Path(s"$dir/file")
+    assert(!fm.exists(path))
+    fm.create(path).close()
+    assert(fm.exists(path))
+    intercept[IOException] {
+      fm.create(path)
+    }
+
+    // Open and delete
+    fm.open(path)
+    fm.delete(path)
+    assert(!fm.exists(path))
+    intercept[IOException] {
+      fm.open(path)
+    }
+    fm.delete(path)  // should not throw exception
+
+    // Rename
+    val path1 = new Path(s"$dir/file1")
+    val path2 = new Path(s"$dir/file2")
+    fm.create(path1).close()
+    assert(fm.exists(path1))
+    fm.rename(path1, path2)
+    intercept[FileNotFoundException] {
+      fm.rename(path1, path2)
+    }
+    val path3 = new Path(s"$dir/file3")
+    fm.create(path3).close()
+    assert(fm.exists(path3))
+    intercept[FileAlreadyExistsException] {
+      fm.rename(path2, path3)
+    }
+  }
+}
+
+/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */
+class FakeFileSystem extends RawLocalFileSystem {
+  override def getUri: URI = {
+    URI.create(s"$scheme:///")
+  }
+}
+
+object FakeFileSystem {
+  val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/13945dd8/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index c341191..914c6a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.test
 
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.SQLContext
 
 
@@ -25,6 +26,8 @@ import org.apache.spark.sql.SQLContext
  */
 trait SharedSQLContext extends SQLTestUtils {
 
+  protected val sparkConf = new SparkConf()
+
   /**
    * The [[TestSQLContext]] to use for all tests in this suite.
    *
@@ -44,7 +47,7 @@ trait SharedSQLContext extends SQLTestUtils {
   protected override def beforeAll(): Unit = {
     SQLContext.clearSqlListener()
     if (_ctx == null) {
-      _ctx = new TestSQLContext
+      _ctx = new TestSQLContext(sparkConf)
     }
     // Ensure we have initialized the context before calling parent code
     super.beforeAll()

http://git-wip-us.apache.org/repos/asf/spark/blob/13945dd8/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index b3e146f..7ab79b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -26,9 +26,13 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
  */
 private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { self =>
 
-  def this() {
+  def this(sparkConf: SparkConf) {
     this(new SparkContext("local[2]", "test-sql-context",
-      new SparkConf().set("spark.sql.testkey", "true")))
+      sparkConf.set("spark.sql.testkey", "true")))
+  }
+
+  def this() {
+    this(new SparkConf)
   }
 
   @transient


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org