You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/05/21 07:33:08 UTC

spark git commit: [SPARK-7389] [CORE] Tachyon integration improvement

Repository: spark
Updated Branches:
  refs/heads/master d0eb9ffe9 -> 04940c497


[SPARK-7389] [CORE] Tachyon integration improvement

Two main changes:

Add two functions in ExternalBlockManager, which are putValues and getValues
because the implementation may not rely on the putBytes and getBytes

improve Tachyon integration.
Currently, when putting data into Tachyon, Spark first serialize all data in one partition into a ByteBuffer, and then write into Tachyon, this will uses much memory and increase GC overhead

when get data from Tachyon, getValues depends on getBytes, which also read all data into On heap byte arry, and result in much memory usage.
This PR changes the approach of the two functions, make them read / write data by stream to reduce memory usage.

In our testing,  when data size is huge, this patch reduces about 30% GC time and 70% full GC time, and total execution time reduces about 10%

Author: Mingfei <mi...@intel.com>

Closes #5908 from shimingfei/Tachyon-integration-rebase and squashes the following commits:

033bc57 [Mingfei] modify accroding to comments
747c69a [Mingfei] modify according to comments - format changes
ce52c67 [Mingfei] put close() in a finally block
d2c60bb [Mingfei] modify according to comments, some code style change
4c11591 [Mingfei] modify according to comments split putIntoExternalBlockStore into two functions add default implementation for getValues and putValues
cc0a32e [Mingfei] Make getValues read data from Tachyon by stream Make putValues write data to Tachyon by stream
017593d [Mingfei] add getValues and putValues in ExternalBlockManager's Interface


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04940c49
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04940c49
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04940c49

Branch: refs/heads/master
Commit: 04940c49755fd2e7f1ed7b875da287c946bfebeb
Parents: d0eb9ff
Author: Mingfei <mi...@intel.com>
Authored: Wed May 20 22:33:03 2015 -0700
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Wed May 20 22:33:03 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 36 +++++---
 .../spark/storage/ExternalBlockManager.scala    | 22 ++++-
 .../spark/storage/ExternalBlockStore.scala      | 88 ++++++++++++++------
 .../spark/storage/TachyonBlockManager.scala     | 51 ++++++++++--
 4 files changed, 149 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/04940c49/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 16d67cb..5048c7d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream, OutputStream}
+import java.io._
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -489,16 +489,17 @@ private[spark] class BlockManager(
         if (level.useOffHeap) {
           logDebug(s"Getting block $blockId from ExternalBlockStore")
           if (externalBlockStore.contains(blockId)) {
-            externalBlockStore.getBytes(blockId) match {
-              case Some(bytes) =>
-                if (!asBlockResult) {
-                  return Some(bytes)
-                } else {
-                  return Some(new BlockResult(
-                    dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
-                }
+            val result = if (asBlockResult) {
+              externalBlockStore.getValues(blockId)
+                .map(new BlockResult(_, DataReadMethod.Memory, info.size))
+            } else {
+              externalBlockStore.getBytes(blockId)
+            }
+            result match {
+              case Some(values) =>
+                return result
               case None =>
-                logDebug(s"Block $blockId not found in externalBlockStore")
+                logDebug(s"Block $blockId not found in ExternalBlockStore")
             }
           }
         }
@@ -1206,8 +1207,19 @@ private[spark] class BlockManager(
       bytes: ByteBuffer,
       serializer: Serializer = defaultSerializer): Iterator[Any] = {
     bytes.rewind()
-    val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
-    serializer.newInstance().deserializeStream(stream).asIterator
+    dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
+  }
+
+  /**
+   * Deserializes a InputStream into an iterator of values and disposes of it when the end of
+   * the iterator is reached.
+   */
+  def dataDeserializeStream(
+      blockId: BlockId,
+      inputStream: InputStream,
+      serializer: Serializer = defaultSerializer): Iterator[Any] = {
+    val stream = new BufferedInputStream(inputStream)
+    serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
   }
 
   def stop(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/04940c49/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
index 8964762..f39325a 100644
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
@@ -32,6 +32,8 @@ import java.nio.ByteBuffer
  */
 private[spark] abstract class ExternalBlockManager {
 
+  protected var blockManager: BlockManager = _
+
   override def toString: String = {"External Block Store"}
 
   /**
@@ -41,7 +43,9 @@ private[spark] abstract class ExternalBlockManager {
    *
    * @throws java.io.IOException if there is any file system failure during the initialization.
    */
-  def init(blockManager: BlockManager, executorId: String): Unit
+  def init(blockManager: BlockManager, executorId: String): Unit = {
+    this.blockManager = blockManager
+  }
 
   /**
    * Drop the block from underlying external block store, if it exists..
@@ -73,6 +77,11 @@ private[spark] abstract class ExternalBlockManager {
    */
   def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit
 
+  def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
+    val bytes = blockManager.dataSerialize(blockId, values)
+    putBytes(blockId, bytes)
+  }
+
   /**
    * Retrieve the block bytes.
    * @return Some(ByteBuffer) if the block bytes is successfully retrieved
@@ -83,6 +92,17 @@ private[spark] abstract class ExternalBlockManager {
   def getBytes(blockId: BlockId): Option[ByteBuffer]
 
   /**
+   * Retrieve the block data.
+   * @return Some(Iterator[Any]) if the block data is successfully retrieved
+   *         None if the block does not exist in the external block store.
+   *
+   * @throws java.io.IOException if there is any file system failure in getting the block.
+   */
+  def getValues(blockId: BlockId): Option[Iterator[_]] = {
+    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+  }
+
+  /**
    * Get the size of the block saved in the underlying external block store,
    * which is saved before by putBytes.
    * @return size of the block

http://git-wip-us.apache.org/repos/asf/spark/blob/04940c49/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
index 0bf7703..291394e 100644
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
@@ -18,9 +18,11 @@
 package org.apache.spark.storage
 
 import java.nio.ByteBuffer
+
+import scala.util.control.NonFatal
+
 import org.apache.spark.Logging
 import org.apache.spark.util.Utils
-import scala.util.control.NonFatal
 
 
 /**
@@ -40,7 +42,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
       externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
     } catch {
       case NonFatal(t) =>
-        logError(s"error in getSize from $blockId", t)
+        logError(s"Error in getSize($blockId)", t)
         0L
     }
   }
@@ -54,7 +56,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
       values: Array[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    putIterator(blockId, values.toIterator, level, returnValues)
+    putIntoExternalBlockStore(blockId, values.toIterator, returnValues)
   }
 
   override def putIterator(
@@ -62,42 +64,70 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
       values: Iterator[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    logDebug(s"Attempting to write values for block $blockId")
-    val bytes = blockManager.dataSerialize(blockId, values)
-    putIntoExternalBlockStore(blockId, bytes, returnValues)
+    putIntoExternalBlockStore(blockId, values, returnValues)
   }
 
   private def putIntoExternalBlockStore(
       blockId: BlockId,
-      bytes: ByteBuffer,
+      values: Iterator[_],
       returnValues: Boolean): PutResult = {
-    // So that we do not modify the input offsets !
-    // duplicate does not copy buffer, so inexpensive
-    val byteBuffer = bytes.duplicate()
-    byteBuffer.rewind()
-    logDebug(s"Attempting to put block $blockId into ExtBlk store")
+    logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
     // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
     try {
       val startTime = System.currentTimeMillis
       if (externalBlockManager.isDefined) {
-        externalBlockManager.get.putBytes(blockId, bytes)
+        externalBlockManager.get.putValues(blockId, values)
+        val size = getSize(blockId)
+        val data = if (returnValues) {
+          Left(getValues(blockId).get)
+        } else {
+          null
+        }
         val finishTime = System.currentTimeMillis
         logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
-          blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
+          blockId, Utils.bytesToString(size), finishTime - startTime))
+        PutResult(size, data)
+      } else {
+        logError(s"Error in putValues($blockId): no ExternalBlockManager has been configured")
+        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
+      }
+    } catch {
+      case NonFatal(t) =>
+        logError(s"Error in putValues($blockId)", t)
+        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
+    }
+  }
 
-        if (returnValues) {
-          PutResult(bytes.limit(), Right(bytes.duplicate()))
+  private def putIntoExternalBlockStore(
+      blockId: BlockId,
+      bytes: ByteBuffer,
+      returnValues: Boolean): PutResult = {
+    logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
+    // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
+    try {
+      val startTime = System.currentTimeMillis
+      if (externalBlockManager.isDefined) {
+        val byteBuffer = bytes.duplicate()
+        byteBuffer.rewind()
+        externalBlockManager.get.putBytes(blockId, byteBuffer)
+        val size = bytes.limit()
+        val data = if (returnValues) {
+          Right(bytes)
         } else {
-          PutResult(bytes.limit(), null)
+          null
         }
+        val finishTime = System.currentTimeMillis
+        logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
+          blockId, Utils.bytesToString(size), finishTime - startTime))
+        PutResult(size, data)
       } else {
-        logError(s"error in putBytes $blockId")
-        PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+        logError(s"Error in putBytes($blockId): no ExternalBlockManager has been configured")
+        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
       }
     } catch {
       case NonFatal(t) =>
-        logError(s"error in putBytes $blockId", t)
-        PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+        logError(s"Error in putBytes($blockId)", t)
+        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
     }
   }
 
@@ -107,13 +137,19 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
       externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
     } catch {
       case NonFatal(t) =>
-        logError(s"error in removing $blockId", t)
+        logError(s"Error in removeBlock($blockId)", t)
         true
     }
   }
 
   override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
-    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+    try {
+      externalBlockManager.flatMap(_.getValues(blockId))
+    } catch {
+      case NonFatal(t) =>
+        logError(s"Error in getValues($blockId)", t)
+        None
+    }
   }
 
   override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -121,7 +157,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
       externalBlockManager.flatMap(_.getBytes(blockId))
     } catch {
       case NonFatal(t) =>
-        logError(s"error in getBytes from $blockId", t)
+        logError(s"Error in getBytes($blockId)", t)
         None
     }
   }
@@ -130,13 +166,13 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
     try {
       val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
       if (!ret) {
-        logInfo(s"remove block $blockId")
+        logInfo(s"Remove block $blockId")
         blockManager.removeBlock(blockId, true)
       }
       ret
     } catch {
       case NonFatal(t) =>
-        logError(s"error in getBytes from $blockId", t)
+        logError(s"Error in getBytes($blockId)", t)
         false
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/04940c49/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index bdc6276..fb4ba0e 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -22,7 +22,10 @@ import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
 import java.util.{Date, Random}
 
+import scala.util.control.NonFatal
+
 import com.google.common.io.ByteStreams
+
 import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
 import tachyon.TachyonURI
 
@@ -38,7 +41,6 @@ import org.apache.spark.util.Utils
  */
 private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging {
 
-  var blockManager: BlockManager =_
   var rootDirs: String = _
   var master: String = _
   var client: tachyon.client.TachyonFS = _
@@ -52,7 +54,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
 
 
   override def init(blockManager: BlockManager, executorId: String): Unit = {
-    this.blockManager = blockManager
+    super.init(blockManager, executorId)
     val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon")
     val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME)
 
@@ -95,8 +97,29 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
   override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
     val file = getFile(blockId)
     val os = file.getOutStream(WriteType.TRY_CACHE)
-    os.write(bytes.array())
-    os.close()
+    try {
+      os.write(bytes.array())
+    } catch {
+      case NonFatal(e) => 
+        logWarning(s"Failed to put bytes of block $blockId into Tachyon", e)
+        os.cancel()
+    } finally {
+      os.close()
+    }
+  }
+
+  override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
+    val file = getFile(blockId)
+    val os = file.getOutStream(WriteType.TRY_CACHE)
+    try {
+      blockManager.dataSerializeStream(blockId, os, values)
+    } catch {
+      case NonFatal(e) => 
+        logWarning(s"Failed to put values of block $blockId into Tachyon", e)
+        os.cancel()
+    } finally {
+      os.close()
+    }
   }
 
   override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -105,21 +128,31 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
       return None
     }
     val is = file.getInStream(ReadType.CACHE)
-    assert (is != null)
     try {
       val size = file.length
       val bs = new Array[Byte](size.asInstanceOf[Int])
       ByteStreams.readFully(is, bs)
       Some(ByteBuffer.wrap(bs))
     } catch {
-      case ioe: IOException =>
-        logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
+      case NonFatal(e) =>
+        logWarning(s"Failed to get bytes of block $blockId from Tachyon", e)
         None
     } finally {
       is.close()
     }
   }
 
+  override def getValues(blockId: BlockId): Option[Iterator[_]] = {
+    val file = getFile(blockId)
+    if (file == null || file.getLocationHosts().size() == 0) {
+      return None
+    }
+    val is = file.getInStream(ReadType.CACHE)
+    Option(is).map { is =>
+      blockManager.dataDeserializeStream(blockId, is)
+    }
+  }
+
   override def getSize(blockId: BlockId): Long = {
     getFile(blockId.name).length
   }
@@ -184,7 +217,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
             tachyonDir = client.getFile(path)
           }
         } catch {
-          case e: Exception =>
+          case NonFatal(e) =>
             logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
         }
       }
@@ -206,7 +239,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
           Utils.deleteRecursively(tachyonDir, client)
         }
       } catch {
-        case e: Exception =>
+        case NonFatal(e) =>
           logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org