You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/12/17 05:01:50 UTC
spark git commit: [SPARK-12390] Clean up unused serializer parameter
in BlockManager
Repository: spark
Updated Branches:
refs/heads/master d1508dd9b -> 97678edea
[SPARK-12390] Clean up unused serializer parameter in BlockManager
No change in functionality is intended. This only changes internal API.
Author: Andrew Or <an...@databricks.com>
Closes #10343 from andrewor14/clean-bm-serializer.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97678ede
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97678ede
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97678ede
Branch: refs/heads/master
Commit: 97678edeaaafc19ea18d044233a952d2e2e89fbc
Parents: d1508dd
Author: Andrew Or <an...@databricks.com>
Authored: Wed Dec 16 20:01:47 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Dec 16 20:01:47 2015 -0800
----------------------------------------------------------------------
.../org/apache/spark/storage/BlockManager.scala | 29 ++++++++------------
.../org/apache/spark/storage/DiskStore.scala | 10 -------
2 files changed, 11 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/97678ede/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 540e1ec..6074fc5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1190,20 +1190,16 @@ private[spark] class BlockManager(
def dataSerializeStream(
blockId: BlockId,
outputStream: OutputStream,
- values: Iterator[Any],
- serializer: Serializer = defaultSerializer): Unit = {
+ values: Iterator[Any]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
- val ser = serializer.newInstance()
+ val ser = defaultSerializer.newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}
/** Serializes into a byte buffer. */
- def dataSerialize(
- blockId: BlockId,
- values: Iterator[Any],
- serializer: Serializer = defaultSerializer): ByteBuffer = {
+ def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
val byteStream = new ByteBufferOutputStream(4096)
- dataSerializeStream(blockId, byteStream, values, serializer)
+ dataSerializeStream(blockId, byteStream, values)
byteStream.toByteBuffer
}
@@ -1211,24 +1207,21 @@ private[spark] class BlockManager(
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
- def dataDeserialize(
- blockId: BlockId,
- bytes: ByteBuffer,
- serializer: Serializer = defaultSerializer): Iterator[Any] = {
+ def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
bytes.rewind()
- dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
+ dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
}
/**
* Deserializes a InputStream into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
- def dataDeserializeStream(
- blockId: BlockId,
- inputStream: InputStream,
- serializer: Serializer = defaultSerializer): Iterator[Any] = {
+ def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = {
val stream = new BufferedInputStream(inputStream)
- serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
+ defaultSerializer
+ .newInstance()
+ .deserializeStream(wrapForCompression(blockId, stream))
+ .asIterator
}
def stop(): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/97678ede/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index c008b9d..6c44771 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
}
- /**
- * A version of getValues that allows a custom serializer. This is used as part of the
- * shuffle short-circuit code.
- */
- def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
- // TODO: Should bypass getBytes and use a stream based implementation, so that
- // we won't use a lot of memory during e.g. external sort merge.
- getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
- }
-
override def remove(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
if (file.exists()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org