You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/12/17 05:01:50 UTC

spark git commit: [SPARK-12390] Clean up unused serializer parameter in BlockManager

Repository: spark
Updated Branches:
  refs/heads/master d1508dd9b -> 97678edea


[SPARK-12390] Clean up unused serializer parameter in BlockManager

No change in functionality is intended. This only changes internal API.

Author: Andrew Or <an...@databricks.com>

Closes #10343 from andrewor14/clean-bm-serializer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97678ede
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97678ede
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97678ede

Branch: refs/heads/master
Commit: 97678edeaaafc19ea18d044233a952d2e2e89fbc
Parents: d1508dd
Author: Andrew Or <an...@databricks.com>
Authored: Wed Dec 16 20:01:47 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Dec 16 20:01:47 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 29 ++++++++------------
 .../org/apache/spark/storage/DiskStore.scala    | 10 -------
 2 files changed, 11 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/97678ede/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 540e1ec..6074fc5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1190,20 +1190,16 @@ private[spark] class BlockManager(
   def dataSerializeStream(
       blockId: BlockId,
       outputStream: OutputStream,
-      values: Iterator[Any],
-      serializer: Serializer = defaultSerializer): Unit = {
+      values: Iterator[Any]): Unit = {
     val byteStream = new BufferedOutputStream(outputStream)
-    val ser = serializer.newInstance()
+    val ser = defaultSerializer.newInstance()
     ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
   }
 
   /** Serializes into a byte buffer. */
-  def dataSerialize(
-      blockId: BlockId,
-      values: Iterator[Any],
-      serializer: Serializer = defaultSerializer): ByteBuffer = {
+  def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
     val byteStream = new ByteBufferOutputStream(4096)
-    dataSerializeStream(blockId, byteStream, values, serializer)
+    dataSerializeStream(blockId, byteStream, values)
     byteStream.toByteBuffer
   }
 
@@ -1211,24 +1207,21 @@ private[spark] class BlockManager(
    * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
    * the iterator is reached.
    */
-  def dataDeserialize(
-      blockId: BlockId,
-      bytes: ByteBuffer,
-      serializer: Serializer = defaultSerializer): Iterator[Any] = {
+  def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
     bytes.rewind()
-    dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
+    dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
   }
 
   /**
    * Deserializes a InputStream into an iterator of values and disposes of it when the end of
    * the iterator is reached.
    */
-  def dataDeserializeStream(
-      blockId: BlockId,
-      inputStream: InputStream,
-      serializer: Serializer = defaultSerializer): Iterator[Any] = {
+  def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = {
     val stream = new BufferedInputStream(inputStream)
-    serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
+    defaultSerializer
+      .newInstance()
+      .deserializeStream(wrapForCompression(blockId, stream))
+      .asIterator
   }
 
   def stop(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/97678ede/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index c008b9d..6c44771 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
   }
 
-  /**
-   * A version of getValues that allows a custom serializer. This is used as part of the
-   * shuffle short-circuit code.
-   */
-  def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
-    // TODO: Should bypass getBytes and use a stream based implementation, so that
-    // we won't use a lot of memory during e.g. external sort merge.
-    getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
-  }
-
   override def remove(blockId: BlockId): Boolean = {
     val file = diskManager.getFile(blockId.name)
     if (file.exists()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org