You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/21 03:51:37 UTC

spark git commit: [SPARK-17204][CORE] Fix replicated off heap storage

Repository: spark
Updated Branches:
  refs/heads/master 0ec1db547 -> 7fa116f8f


[SPARK-17204][CORE] Fix replicated off heap storage

(Jira: https://issues.apache.org/jira/browse/SPARK-17204)

## What changes were proposed in this pull request?

There are a couple of bugs in the `BlockManager` with respect to support for replicated off-heap storage. First, the locally-stored off-heap byte buffer is disposed of when it is replicated. It should not be. Second, the replica byte buffers are stored as heap byte buffers instead of direct byte buffers even when the storage level memory mode is off-heap. This PR addresses both of these problems.

## How was this patch tested?

`BlockManagerReplicationSuite` was enhanced to fill in the coverage gaps. It now fails if either of the bugs in this PR exist.

Author: Michael Allman <mi...@videoamp.com>

Closes #16499 from mallman/spark-17204-replicated_off_heap_storage.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fa116f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fa116f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fa116f8

Branch: refs/heads/master
Commit: 7fa116f8fc77906202217c0cd2f9718a4e62632b
Parents: 0ec1db5
Author: Michael Allman <mi...@videoamp.com>
Authored: Tue Mar 21 11:51:22 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Mar 21 11:51:22 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 23 +++++++--
 .../org/apache/spark/storage/StorageUtils.scala | 52 +++++++++++++++++---
 .../spark/util/ByteBufferInputStream.scala      |  8 +--
 .../spark/util/io/ChunkedByteBuffer.scala       | 27 ++++++++--
 .../storage/BlockManagerReplicationSuite.scala  | 20 ++++++--
 5 files changed, 105 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7fa116f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 45b7338..245d94a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -317,6 +317,9 @@ private[spark] class BlockManager(
 
   /**
    * Put the block locally, using the given storage level.
+   *
+   * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+   * so may corrupt or change the data stored by the `BlockManager`.
    */
   override def putBlockData(
       blockId: BlockId,
@@ -755,6 +758,9 @@ private[spark] class BlockManager(
   /**
    * Put a new block of serialized bytes to the block manager.
    *
+   * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+   * so may corrupt or change the data stored by the `BlockManager`.
+   *
    * @param encrypt If true, asks the block manager to encrypt the data block before storing,
    *                when I/O encryption is enabled. This is required for blocks that have been
    *                read from unencrypted sources, since all the BlockManager read APIs
@@ -773,7 +779,7 @@ private[spark] class BlockManager(
       if (encrypt && securityManager.ioEncryptionKey.isDefined) {
         try {
           val data = bytes.toByteBuffer
-          val in = new ByteBufferInputStream(data, true)
+          val in = new ByteBufferInputStream(data)
           val byteBufOut = new ByteBufferOutputStream(data.remaining())
           val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf,
             securityManager.ioEncryptionKey.get)
@@ -800,6 +806,9 @@ private[spark] class BlockManager(
    *
    * If the block already exists, this method will not overwrite it.
    *
+   * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+   * so may corrupt or change the data stored by the `BlockManager`.
+   *
    * @param keepReadLock if true, this method will hold the read lock when it returns (even if the
    *                     block already exists). If false, this method will hold no locks when it
    *                     returns.
@@ -843,7 +852,15 @@ private[spark] class BlockManager(
               false
           }
         } else {
-          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
+          val memoryMode = level.memoryMode
+          memoryStore.putBytes(blockId, size, memoryMode, () => {
+            if (memoryMode == MemoryMode.OFF_HEAP &&
+                bytes.chunks.exists(buffer => !buffer.isDirect)) {
+              bytes.copy(Platform.allocateDirectBuffer)
+            } else {
+              bytes
+            }
+          })
         }
         if (!putSucceeded && level.useDisk) {
           logWarning(s"Persisting block $blockId to disk instead.")
@@ -1048,7 +1065,7 @@ private[spark] class BlockManager(
           try {
             replicate(blockId, bytesToReplicate, level, remoteClassTag)
           } finally {
-            bytesToReplicate.dispose()
+            bytesToReplicate.unmap()
           }
           logDebug("Put block %s remotely took %s"
             .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))

http://git-wip-us.apache.org/repos/asf/spark/blob/7fa116f8/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index e12f2e6..5efdd23 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -236,22 +236,60 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
 
 /** Helper methods for storage-related objects. */
 private[spark] object StorageUtils extends Logging {
+  // Ewwww... Reflection!!! See the unmap method for justification
+  private val memoryMappedBufferFileDescriptorField = {
+    val mappedBufferClass = classOf[java.nio.MappedByteBuffer]
+    val fdField = mappedBufferClass.getDeclaredField("fd")
+    fdField.setAccessible(true)
+    fdField
+  }
 
   /**
-   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
-   * might cause errors if one attempts to read from the unmapped buffer, but it's better than
-   * waiting for the GC to find it because that could lead to huge numbers of open files. There's
-   * unfortunately no standard API to do this.
+   * Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun
+   * API that will cause errors if one attempts to read from the disposed buffer. However, neither
+   * the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put
+   * pressure on the garbage collector. Waiting for garbage collection may lead to the depletion of
+   * off-heap memory or huge numbers of open files. There's unfortunately no standard API to
+   * manually dispose of these kinds of buffers.
+   *
+   * See also [[unmap]]
    */
   def dispose(buffer: ByteBuffer): Unit = {
     if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
-      logTrace(s"Unmapping $buffer")
-      if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
-        buffer.asInstanceOf[DirectBuffer].cleaner().clean()
+      logTrace(s"Disposing of $buffer")
+      cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
+    }
+  }
+
+  /**
+   * Attempt to unmap a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that will
+   * cause errors if one attempts to read from the unmapped buffer. However, the file descriptors of
+   * memory-mapped buffers do not put pressure on the garbage collector. Waiting for garbage
+   * collection may lead to huge numbers of open files. There's unfortunately no standard API to
+   * manually unmap memory-mapped buffers.
+   *
+   * See also [[dispose]]
+   */
+  def unmap(buffer: ByteBuffer): Unit = {
+    if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
+      // Note that direct buffers are instances of MappedByteBuffer. As things stand in Java 8, the
+      // JDK does not provide a public API to distinguish between direct buffers and memory-mapped
+      // buffers. As an alternative, we peek beneath the curtains and look for a non-null file
+      // descriptor in mappedByteBuffer
+      if (memoryMappedBufferFileDescriptorField.get(buffer) != null) {
+        logTrace(s"Unmapping $buffer")
+        cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
       }
     }
   }
 
+  private def cleanDirectBuffer(buffer: DirectBuffer) = {
+    val cleaner = buffer.cleaner()
+    if (cleaner != null) {
+      cleaner.clean()
+    }
+  }
+
   /**
    * Update the given list of RDDInfo with the given list of storage statuses.
    * This method overwrites the old values stored in the RDDInfo's.

http://git-wip-us.apache.org/repos/asf/spark/blob/7fa116f8/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
index dce2ac6..50dc948 100644
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
@@ -23,11 +23,10 @@ import java.nio.ByteBuffer
 import org.apache.spark.storage.StorageUtils
 
 /**
- * Reads data from a ByteBuffer, and optionally cleans it up using StorageUtils.dispose()
- * at the end of the stream (e.g. to close a memory-mapped file).
+ * Reads data from a ByteBuffer.
  */
 private[spark]
-class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = false)
+class ByteBufferInputStream(private var buffer: ByteBuffer)
   extends InputStream {
 
   override def read(): Int = {
@@ -72,9 +71,6 @@ class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = f
    */
   private def cleanUp() {
     if (buffer != null) {
-      if (dispose) {
-        StorageUtils.dispose(buffer)
-      }
       buffer = null
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7fa116f8/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 7572cac..1667516 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -86,7 +86,11 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
   }
 
   /**
-   * Copy this buffer into a new ByteBuffer.
+   * Convert this buffer to a ByteBuffer. If this buffer is backed by a single chunk, its underlying
+   * data will not be copied. Instead, it will be duplicated. If this buffer is backed by multiple
+   * chunks, the data underlying this buffer will be copied into a new byte buffer. As a result, it
+   * is suggested to use this method only if the caller does not need to manage the memory
+   * underlying this buffer.
    *
    * @throws UnsupportedOperationException if this buffer's size exceeds the max ByteBuffer size.
    */
@@ -132,10 +136,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
   }
 
   /**
-   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
-   * might cause errors if one attempts to read from the unmapped buffer, but it's better than
-   * waiting for the GC to find it because that could lead to huge numbers of open files. There's
-   * unfortunately no standard API to do this.
+   * Attempt to clean up any ByteBuffer in this ChunkedByteBuffer which is direct or memory-mapped.
+   * See [[StorageUtils.dispose]] for more information.
+   *
+   * See also [[unmap]]
    */
   def dispose(): Unit = {
     if (!disposed) {
@@ -143,6 +147,19 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
       disposed = true
     }
   }
+
+  /**
+   * Attempt to unmap any ByteBuffer in this ChunkedByteBuffer if it is memory-mapped. See
+   * [[StorageUtils.unmap]] for more information.
+   *
+   * See also [[dispose]]
+   */
+  def unmap(): Unit = {
+    if (!disposed) {
+      chunks.foreach(StorageUtils.unmap)
+      disposed = true
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7fa116f8/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 75dc040..d907add 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -374,7 +374,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
       // Put the block into one of the stores
       val blockId = new TestBlockId(
         "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
-      stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+      val testValue = Array.fill[Byte](blockSize)(1)
+      stores(0).putSingle(blockId, testValue, storageLevel)
 
       // Assert that master know two locations for the block
       val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -386,12 +387,23 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
         testStore => blockLocations.contains(testStore.blockManagerId.executorId)
       }.foreach { testStore =>
         val testStoreName = testStore.blockManagerId.executorId
-        assert(
-          testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName")
-        testStore.releaseLock(blockId)
+        val blockResultOpt = testStore.getLocalValues(blockId)
+        assert(blockResultOpt.isDefined, s"$blockId was not found in $testStoreName")
+        val localValues = blockResultOpt.get.data.toSeq
+        assert(localValues.size == 1)
+        assert(localValues.head === testValue)
         assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
           s"master does not have status for ${blockId.name} in $testStoreName")
 
+        val memoryStore = testStore.memoryStore
+        if (memoryStore.contains(blockId) && !storageLevel.deserialized) {
+          memoryStore.getBytes(blockId).get.chunks.foreach { byteBuffer =>
+            assert(storageLevel.useOffHeap == byteBuffer.isDirect,
+              s"memory mode ${storageLevel.memoryMode} is not compatible with " +
+                byteBuffer.getClass.getSimpleName)
+          }
+        }
+
         val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)
 
         // Assert that block status in the master for this store has expected storage level


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org