You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/04/28 21:18:59 UTC

spark git commit: [SPARK-5932] [CORE] Use consistent naming for size properties

Repository: spark
Updated Branches:
  refs/heads/master 8aab94d89 -> 2d222fb39


[SPARK-5932] [CORE] Use consistent naming for size properties

I've added an interface to JavaUtils to do byte conversion and added hooks within Utils.scala to handle conversion within Spark code (like for time strings). I've added matching tests for size conversion, and then updated all deprecated configs and documentation as per SPARK-5933.

Author: Ilya Ganelin <il...@capitalone.com>

Closes #5574 from ilganeli/SPARK-5932 and squashes the following commits:

11f6999 [Ilya Ganelin] Nit fixes
49a8720 [Ilya Ganelin] Whitespace fix
2ab886b [Ilya Ganelin] Scala style
fc85733 [Ilya Ganelin] Got rid of floating point math
852a407 [Ilya Ganelin] [SPARK-5932] Added much improved overflow handling. Can now handle sizes up to Long.MAX_VALUE Petabytes instead of being capped at Long.MAX_VALUE Bytes
9ee779c [Ilya Ganelin] Simplified fraction matches
22413b1 [Ilya Ganelin] Made MAX private
3dfae96 [Ilya Ganelin] Fixed some nits. Added automatic conversion of old paramter for kryoserializer.mb to new values.
e428049 [Ilya Ganelin] resolving merge conflict
8b43748 [Ilya Ganelin] Fixed error in pattern matching for doubles
84a2581 [Ilya Ganelin] Added smoother handling of fractional values for size parameters. This now throws an exception and added a warning for old spark.kryoserializer.buffer
d3d09b6 [Ilya Ganelin] [SPARK-5932] Fixing error in KryoSerializer
fe286b4 [Ilya Ganelin] Resolved merge conflict
c7803cd [Ilya Ganelin] Empty lines
54b78b4 [Ilya Ganelin] Simplified byteUnit class
69e2f20 [Ilya Ganelin] Updates to code
f32bc01 [Ilya Ganelin] [SPARK-5932] Fixed error in API in SparkConf.scala where Kb conversion wasn't being done properly (was Mb). Added test cases for both timeUnit and ByteUnit conversion
f15f209 [Ilya Ganelin] Fixed conversion of kryo buffer size
0f4443e [Ilya Ganelin]     Merge remote-tracking branch 'upstream/master' into SPARK-5932
35a7fa7 [Ilya Ganelin] Minor formatting
928469e [Ilya Ganelin] [SPARK-5932] Converted some longs to ints
5d29f90 [Ilya Ganelin] [SPARK-5932] Finished documentation updates
7a6c847 [Ilya Ganelin] [SPARK-5932] Updated spark.shuffle.file.buffer
afc9a38 [Ilya Ganelin] [SPARK-5932] Updated spark.broadcast.blockSize and spark.storage.memoryMapThreshold
ae7e9f6 [Ilya Ganelin] [SPARK-5932] Updated spark.io.compression.snappy.block.size
2d15681 [Ilya Ganelin] [SPARK-5932] Updated spark.executor.logs.rolling.size.maxBytes
1fbd435 [Ilya Ganelin] [SPARK-5932] Updated spark.broadcast.blockSize
eba4de6 [Ilya Ganelin] [SPARK-5932] Updated spark.shuffle.file.buffer.kb
b809a78 [Ilya Ganelin] [SPARK-5932] Updated spark.kryoserializer.buffer.max
0cdff35 [Ilya Ganelin] [SPARK-5932] Updated to use bibibytes in method names. Updated spark.kryoserializer.buffer.mb and spark.reducer.maxMbInFlight
475370a [Ilya Ganelin] [SPARK-5932] Simplified ByteUnit code, switched to using longs. Updated docs to clarify that we use kibi, mebi etc instead of kilo, mega
851d691 [Ilya Ganelin] [SPARK-5932] Updated memoryStringToMb to use new interfaces
a9f4fcf [Ilya Ganelin] [SPARK-5932] Added unit tests for unit conversion
747393a [Ilya Ganelin] [SPARK-5932] Added unit tests for ByteString conversion
09ea450 [Ilya Ganelin] [SPARK-5932] Added byte string conversion to Jav utils
5390fd9 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5932
db9a963 [Ilya Ganelin] Closing second spark context
1dc0444 [Ilya Ganelin] Added ref equality check
8c884fa [Ilya Ganelin] Made getOrCreate synchronized
cb0c6b7 [Ilya Ganelin] Doc updates and code cleanup
270cfe3 [Ilya Ganelin] [SPARK-6703] Documentation fixes
15e8dea [Ilya Ganelin] Updated comments and added MiMa Exclude
0e1567c [Ilya Ganelin] Got rid of unecessary option for AtomicReference
dfec4da [Ilya Ganelin] Changed activeContext to AtomicReference
733ec9f [Ilya Ganelin] Fixed some bugs in test code
8be2f83 [Ilya Ganelin] Replaced match with if
e92caf7 [Ilya Ganelin] [SPARK-6703] Added test to ensure that getOrCreate both allows creation, retrieval, and a second context if desired
a99032f [Ilya Ganelin] Spacing fix
d7a06b8 [Ilya Ganelin] Updated SparkConf class to add getOrCreate method. Started test suite implementation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d222fb3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d222fb3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d222fb3

Branch: refs/heads/master
Commit: 2d222fb39dd978e5a33cde6ceb59307cbdf7b171
Parents: 8aab94d
Author: Ilya Ganelin <il...@capitalone.com>
Authored: Tue Apr 28 12:18:55 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Apr 28 12:18:55 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |  90 +++++++++++++++-
 .../spark/broadcast/TorrentBroadcast.scala      |   3 +-
 .../org/apache/spark/io/CompressionCodec.scala  |   8 +-
 .../spark/serializer/KryoSerializer.scala       |  17 +--
 .../spark/shuffle/FileShuffleBlockManager.scala |   3 +-
 .../shuffle/hash/BlockStoreShuffleFetcher.scala |   3 +-
 .../org/apache/spark/storage/DiskStore.scala    |   3 +-
 .../scala/org/apache/spark/util/Utils.scala     |  53 ++++++---
 .../util/collection/ExternalAppendOnlyMap.scala |   6 +-
 .../spark/util/collection/ExternalSorter.scala  |   4 +-
 .../util/logging/RollingFileAppender.scala      |   2 +-
 .../org/apache/spark/DistributedSuite.scala     |   2 +-
 .../scala/org/apache/spark/SparkConfSuite.scala |  19 ++++
 .../KryoSerializerResizableOutputSuite.scala    |   8 +-
 .../spark/serializer/KryoSerializerSuite.scala  |   2 +-
 .../storage/BlockManagerReplicationSuite.scala  |   2 +-
 .../spark/storage/BlockManagerSuite.scala       |   6 +-
 .../org/apache/spark/util/UtilsSuite.scala      | 100 ++++++++++++++++-
 docs/configuration.md                           |  60 ++++++-----
 docs/tuning.md                                  |   2 +-
 .../spark/examples/mllib/MovieLensALS.scala     |   2 +-
 .../org/apache/spark/network/util/ByteUnit.java |  67 ++++++++++++
 .../apache/spark/network/util/JavaUtils.java    | 107 +++++++++++++++++--
 23 files changed, 488 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index c1996e0..a8fc90a 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -211,7 +211,74 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     Utils.timeStringAsMs(get(key, defaultValue))
   }
 
+  /**
+   * Get a size parameter as bytes; throws a NoSuchElementException if it's not set. If no
+   * suffix is provided then bytes are assumed.
+   * @throws NoSuchElementException
+   */
+  def getSizeAsBytes(key: String): Long = {
+    Utils.byteStringAsBytes(get(key))
+  }
+
+  /**
+   * Get a size parameter as bytes, falling back to a default if not set. If no
+   * suffix is provided then bytes are assumed.
+   */
+  def getSizeAsBytes(key: String, defaultValue: String): Long = {
+    Utils.byteStringAsBytes(get(key, defaultValue))
+  }
+  
+  /**
+   * Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
+   * suffix is provided then Kibibytes are assumed.
+   * @throws NoSuchElementException
+   */
+  def getSizeAsKb(key: String): Long = {
+    Utils.byteStringAsKb(get(key))
+  }
+
+  /**
+   * Get a size parameter as Kibibytes, falling back to a default if not set. If no
+   * suffix is provided then Kibibytes are assumed.
+   */
+  def getSizeAsKb(key: String, defaultValue: String): Long = {
+    Utils.byteStringAsKb(get(key, defaultValue))
+  }
+  
+  /**
+   * Get a size parameter as Mebibytes; throws a NoSuchElementException if it's not set. If no
+   * suffix is provided then Mebibytes are assumed.
+   * @throws NoSuchElementException
+   */
+  def getSizeAsMb(key: String): Long = {
+    Utils.byteStringAsMb(get(key))
+  }
+
+  /**
+   * Get a size parameter as Mebibytes, falling back to a default if not set. If no
+   * suffix is provided then Mebibytes are assumed.
+   */
+  def getSizeAsMb(key: String, defaultValue: String): Long = {
+    Utils.byteStringAsMb(get(key, defaultValue))
+  }
+  
+  /**
+   * Get a size parameter as Gibibytes; throws a NoSuchElementException if it's not set. If no
+   * suffix is provided then Gibibytes are assumed.
+   * @throws NoSuchElementException
+   */
+  def getSizeAsGb(key: String): Long = {
+    Utils.byteStringAsGb(get(key))
+  }
 
+  /**
+   * Get a size parameter as Gibibytes, falling back to a default if not set. If no
+   * suffix is provided then Gibibytes are assumed.
+   */
+  def getSizeAsGb(key: String, defaultValue: String): Long = {
+    Utils.byteStringAsGb(get(key, defaultValue))
+  }
+  
   /** Get a parameter as an Option */
   def getOption(key: String): Option[String] = {
     Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
@@ -407,7 +474,13 @@ private[spark] object SparkConf extends Logging {
         "The spark.cache.class property is no longer being used! Specify storage levels using " +
         "the RDD.persist() method instead."),
       DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
-        "Please use spark.{driver,executor}.userClassPathFirst instead."))
+        "Please use spark.{driver,executor}.userClassPathFirst instead."),
+      DeprecatedConfig("spark.kryoserializer.buffer.mb", "1.4",
+        "Please use spark.kryoserializer.buffer instead. The default value for " +
+          "spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
+          "are no longer accepted. To specify the equivalent now, one may use '64k'.")
+    )
+    
     Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
   }
 
@@ -432,6 +505,21 @@ private[spark] object SparkConf extends Logging {
       AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
         // Translate old value to a duration, with 10s wait time per try.
         translation = s => s"${s.toLong * 10}s")),
+    "spark.reducer.maxSizeInFlight" -> Seq(
+      AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
+    "spark.kryoserializer.buffer" ->
+        Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4", 
+          translation = s => s"${s.toDouble * 1000}k")),
+    "spark.kryoserializer.buffer.max" -> Seq(
+      AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
+    "spark.shuffle.file.buffer" -> Seq(
+      AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
+    "spark.executor.logs.rolling.maxSize" -> Seq(
+      AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
+    "spark.io.compression.snappy.blockSize" -> Seq(
+      AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
+    "spark.io.compression.lz4.blockSize" -> Seq(
+      AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
     "spark.rpc.numRetries" -> Seq(
       AlternateConfig("spark.akka.num.retries", "1.4")),
     "spark.rpc.retry.wait" -> Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 23b02e6..a0c9b5e 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -74,7 +74,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
     } else {
       None
     }
-    blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
+    // Note: use getSizeAsKb (not bytes) to maintain compatiblity if no units are provided
+    blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
   }
   setConf(SparkEnv.get.conf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0709b6d..0756cdb 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -97,7 +97,7 @@ private[spark] object CompressionCodec {
 /**
  * :: DeveloperApi ::
  * LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
- * Block size can be configured by `spark.io.compression.lz4.block.size`.
+ * Block size can be configured by `spark.io.compression.lz4.blockSize`.
  *
  * Note: The wire protocol for this codec is not guaranteed to be compatible across versions
  *       of Spark. This is intended for use as an internal compression utility within a single Spark
@@ -107,7 +107,7 @@ private[spark] object CompressionCodec {
 class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
-    val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768)
+    val blockSize = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt
     new LZ4BlockOutputStream(s, blockSize)
   }
 
@@ -137,7 +137,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
 /**
  * :: DeveloperApi ::
  * Snappy implementation of [[org.apache.spark.io.CompressionCodec]].
- * Block size can be configured by `spark.io.compression.snappy.block.size`.
+ * Block size can be configured by `spark.io.compression.snappy.blockSize`.
  *
  * Note: The wire protocol for this codec is not guaranteed to be compatible across versions
  *       of Spark. This is intended for use as an internal compression utility within a single Spark
@@ -153,7 +153,7 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
   }
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
-    val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
+    val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
     new SnappyOutputStream(s, blockSize)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 579fb66..754832b 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -49,16 +49,17 @@ class KryoSerializer(conf: SparkConf)
   with Logging
   with Serializable {
 
-  private val bufferSizeMb = conf.getDouble("spark.kryoserializer.buffer.mb", 0.064)
-  if (bufferSizeMb >= 2048) {
-    throw new IllegalArgumentException("spark.kryoserializer.buffer.mb must be less than " +
-      s"2048 mb, got: + $bufferSizeMb mb.")
+  private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
+  
+  if (bufferSizeKb >= 2048) {
+    throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " +
+      s"2048 mb, got: + $bufferSizeKb mb.")
   }
-  private val bufferSize = (bufferSizeMb * 1024 * 1024).toInt
+  private val bufferSize = (bufferSizeKb * 1024).toInt
 
-  val maxBufferSizeMb = conf.getInt("spark.kryoserializer.buffer.max.mb", 64)
+  val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt
   if (maxBufferSizeMb >= 2048) {
-    throw new IllegalArgumentException("spark.kryoserializer.buffer.max.mb must be less than " +
+    throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " +
       s"2048 mb, got: + $maxBufferSizeMb mb.")
   }
   private val maxBufferSize = maxBufferSizeMb * 1024 * 1024
@@ -173,7 +174,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
     } catch {
       case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
         throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
-          "increase spark.kryoserializer.buffer.max.mb value.")
+          "increase spark.kryoserializer.buffer.max value.")
     }
     ByteBuffer.wrap(output.toBytes)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index 538e150..e9b4e2b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -78,7 +78,8 @@ class FileShuffleBlockManager(conf: SparkConf)
   private val consolidateShuffleFiles =
     conf.getBoolean("spark.shuffle.consolidateFiles", false)
 
-  private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
+  // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided 
+  private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
 
   /**
    * Contains all the state related to a particular shuffle. This includes a pool of unused

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index 7a2c5ae..80374ad 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -79,7 +79,8 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
       blockManager,
       blocksByAddress,
       serializer,
-      SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
+      // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
+      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024) 
     val itr = blockFetcherItr.flatMap(unpackBlock)
 
     val completionIter = CompletionIterator[T, Iterator[T]](itr, {

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 4b232ae..1f45956 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -31,8 +31,7 @@ import org.apache.spark.util.Utils
 private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
   extends BlockStore(blockManager) with Logging {
 
-  val minMemoryMapBytes = blockManager.conf.getLong(
-    "spark.storage.memoryMapThreshold", 2 * 1024L * 1024L)
+  val minMemoryMapBytes = blockManager.conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
 
   override def getSize(blockId: BlockId): Long = {
     diskManager.getFile(blockId.name).length

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 342bc9a..4c028c0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1020,21 +1020,48 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
+   * Convert a passed byte string (e.g. 50b, 100k, or 250m) to bytes for internal use.
+   *
+   * If no suffix is provided, the passed number is assumed to be in bytes.
+   */
+  def byteStringAsBytes(str: String): Long = {
+    JavaUtils.byteStringAsBytes(str)
+  }
+
+  /**
+   * Convert a passed byte string (e.g. 50b, 100k, or 250m) to kibibytes for internal use.
+   *
+   * If no suffix is provided, the passed number is assumed to be in kibibytes.
+   */
+  def byteStringAsKb(str: String): Long = {
+    JavaUtils.byteStringAsKb(str)
+  }
+
+  /**
+   * Convert a passed byte string (e.g. 50b, 100k, or 250m) to mebibytes for internal use.
+   *
+   * If no suffix is provided, the passed number is assumed to be in mebibytes.
+   */
+  def byteStringAsMb(str: String): Long = {
+    JavaUtils.byteStringAsMb(str)
+  }
+
+  /**
+   * Convert a passed byte string (e.g. 50b, 100k, or 250m, 500g) to gibibytes for internal use.
+   *
+   * If no suffix is provided, the passed number is assumed to be in gibibytes.
+   */
+  def byteStringAsGb(str: String): Long = {
+    JavaUtils.byteStringAsGb(str)
+  }
+
+  /**
+   * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of mebibytes.
    */
   def memoryStringToMb(str: String): Int = {
-    val lower = str.toLowerCase
-    if (lower.endsWith("k")) {
-      (lower.substring(0, lower.length-1).toLong / 1024).toInt
-    } else if (lower.endsWith("m")) {
-      lower.substring(0, lower.length-1).toInt
-    } else if (lower.endsWith("g")) {
-      lower.substring(0, lower.length-1).toInt * 1024
-    } else if (lower.endsWith("t")) {
-      lower.substring(0, lower.length-1).toInt * 1024 * 1024
-    } else {// no suffix, so it's just a number in bytes
-      (lower.toLong / 1024 / 1024).toInt
-    }
+    // Convert to bytes, rather than directly to MB, because when no units are specified the unit
+    // is assumed to be bytes
+    (JavaUtils.byteStringAsBytes(str) / 1024 / 1024).toInt
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 30dd7f2..f912049 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -89,8 +89,10 @@ class ExternalAppendOnlyMap[K, V, C](
 
   // Number of bytes spilled in total
   private var _diskBytesSpilled = 0L
-
-  private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
+  
+  // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
+  private val fileBufferSize = 
+    sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
 
   // Write metrics for current spill
   private var curWriteMetrics: ShuffleWriteMetrics = _

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 79a695f..ef3cac6 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -108,7 +108,9 @@ private[spark] class ExternalSorter[K, V, C](
 
   private val conf = SparkEnv.get.conf
   private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
-  private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
+  
+  // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
+  private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
   private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)
 
   // Size of object batches when reading/writing from serializers.

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index e579421..7138b4b 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -138,7 +138,7 @@ private[spark] object RollingFileAppender {
   val STRATEGY_DEFAULT = ""
   val INTERVAL_PROPERTY = "spark.executor.logs.rolling.time.interval"
   val INTERVAL_DEFAULT = "daily"
-  val SIZE_PROPERTY = "spark.executor.logs.rolling.size.maxBytes"
+  val SIZE_PROPERTY = "spark.executor.logs.rolling.maxSize"
   val SIZE_DEFAULT = (1024 * 1024).toString
   val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
   val DEFAULT_BUFFER_SIZE = 8192

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 97ea357..96a9c20 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -77,7 +77,7 @@ class DistributedSuite extends FunSuite with Matchers with LocalSparkContext {
   }
 
   test("groupByKey where map output sizes exceed maxMbInFlight") {
-    val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1")
+    val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "1m")
     sc = new SparkContext(clusterUrl, "test", conf)
     // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
     // file should be about 2.5 MB

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 272e6af..68d08e3 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -24,11 +24,30 @@ import scala.language.postfixOps
 import scala.util.{Try, Random}
 
 import org.scalatest.FunSuite
+import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
 import org.apache.spark.util.{RpcUtils, ResetSystemProperties}
 import com.esotericsoftware.kryo.Kryo
 
 class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
+  test("Test byteString conversion") {
+    val conf = new SparkConf()
+    // Simply exercise the API, we don't need a complete conversion test since that's handled in
+    // UtilsSuite.scala
+    assert(conf.getSizeAsBytes("fake","1k") === ByteUnit.KiB.toBytes(1))
+    assert(conf.getSizeAsKb("fake","1k") === ByteUnit.KiB.toKiB(1))
+    assert(conf.getSizeAsMb("fake","1k") === ByteUnit.KiB.toMiB(1))
+    assert(conf.getSizeAsGb("fake","1k") === ByteUnit.KiB.toGiB(1))
+  }
+
+  test("Test timeString conversion") {
+    val conf = new SparkConf()
+    // Simply exercise the API, we don't need a complete conversion test since that's handled in
+    // UtilsSuite.scala
+    assert(conf.getTimeAsMs("fake","1ms") === TimeUnit.MILLISECONDS.toMillis(1))
+    assert(conf.getTimeAsSeconds("fake","1000ms") === TimeUnit.MILLISECONDS.toSeconds(1000))
+  }
+
   test("loading from system properties") {
     System.setProperty("spark.test.testProperty", "2")
     val conf = new SparkConf()

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
index 967c9e9..da98d09 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
@@ -33,8 +33,8 @@ class KryoSerializerResizableOutputSuite extends FunSuite {
   test("kryo without resizable output buffer should fail on large array") {
     val conf = new SparkConf(false)
     conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.kryoserializer.buffer.mb", "1")
-    conf.set("spark.kryoserializer.buffer.max.mb", "1")
+    conf.set("spark.kryoserializer.buffer", "1m")
+    conf.set("spark.kryoserializer.buffer.max", "1m")
     val sc = new SparkContext("local", "test", conf)
     intercept[SparkException](sc.parallelize(x).collect())
     LocalSparkContext.stop(sc)
@@ -43,8 +43,8 @@ class KryoSerializerResizableOutputSuite extends FunSuite {
   test("kryo with resizable output buffer should succeed on large array") {
     val conf = new SparkConf(false)
     conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.kryoserializer.buffer.mb", "1")
-    conf.set("spark.kryoserializer.buffer.max.mb", "2")
+    conf.set("spark.kryoserializer.buffer", "1m")
+    conf.set("spark.kryoserializer.buffer.max", "2m")
     val sc = new SparkContext("local", "test", conf)
     assert(sc.parallelize(x).collect() === x)
     LocalSparkContext.stop(sc)

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index b070a54..1b13559 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -269,7 +269,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
 
   test("serialization buffer overflow reporting") {
     import org.apache.spark.SparkException
-    val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max.mb"
+    val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
 
     val largeObject = (1 to 1000000).toArray
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index ffa5162..f647200 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -50,7 +50,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
   val allStores = new ArrayBuffer[BlockManager]
 
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
-  conf.set("spark.kryoserializer.buffer.mb", "1")
+  conf.set("spark.kryoserializer.buffer", "1m")
   val serializer = new KryoSerializer(conf)
 
   // Implicitly convert strings to BlockIds for test clarity.

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 7d82a7c..6957bc7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -55,7 +55,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
   val shuffleManager = new HashShuffleManager(conf)
 
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
-  conf.set("spark.kryoserializer.buffer.mb", "1")
+  conf.set("spark.kryoserializer.buffer", "1m")
   val serializer = new KryoSerializer(conf)
 
   // Implicitly convert strings to BlockIds for test clarity.
@@ -814,14 +814,14 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     // be nice to refactor classes involved in disk storage in a way that
     // allows for easier testing.
     val blockManager = mock(classOf[BlockManager])
-    when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
+    when(blockManager.conf).thenReturn(conf.clone.set(confKey, "0"))
     val diskBlockManager = new DiskBlockManager(blockManager, conf)
 
     val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
     diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
     val mapped = diskStoreMapped.getBytes(blockId).get
 
-    when(blockManager.conf).thenReturn(conf.clone.set(confKey, (1000 * 1000).toString))
+    when(blockManager.conf).thenReturn(conf.clone.set(confKey, "1m"))
     val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager)
     diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
     val notMapped = diskStoreNotMapped.getBytes(blockId).get

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 1ba9980..62a3cbc 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -23,7 +23,6 @@ import java.nio.{ByteBuffer, ByteOrder}
 import java.text.DecimalFormatSymbols
 import java.util.concurrent.TimeUnit
 import java.util.Locale
-import java.util.PriorityQueue
 
 import scala.collection.mutable.ListBuffer
 import scala.util.Random
@@ -35,6 +34,7 @@ import org.scalatest.FunSuite
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.SparkConf
 
 class UtilsSuite extends FunSuite with ResetSystemProperties {
@@ -66,6 +66,10 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
 
     // Test invalid strings
     intercept[NumberFormatException] {
+      Utils.timeStringAsMs("600l")
+    }
+    
+    intercept[NumberFormatException] {
       Utils.timeStringAsMs("This breaks 600s")
     }
 
@@ -82,6 +86,100 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     }
   }
 
+  test("Test byteString conversion") {
+    // Test zero
+    assert(Utils.byteStringAsBytes("0") === 0)
+
+    assert(Utils.byteStringAsGb("1") === 1)
+    assert(Utils.byteStringAsGb("1g") === 1)
+    assert(Utils.byteStringAsGb("1023m") === 0)
+    assert(Utils.byteStringAsGb("1024m") === 1)
+    assert(Utils.byteStringAsGb("1048575k") === 0)
+    assert(Utils.byteStringAsGb("1048576k") === 1)
+    assert(Utils.byteStringAsGb("1k") === 0)
+    assert(Utils.byteStringAsGb("1t") === ByteUnit.TiB.toGiB(1))
+    assert(Utils.byteStringAsGb("1p") === ByteUnit.PiB.toGiB(1))
+    
+    assert(Utils.byteStringAsMb("1") === 1)
+    assert(Utils.byteStringAsMb("1m") === 1)
+    assert(Utils.byteStringAsMb("1048575b") === 0)
+    assert(Utils.byteStringAsMb("1048576b") === 1)
+    assert(Utils.byteStringAsMb("1023k") === 0)
+    assert(Utils.byteStringAsMb("1024k") === 1)
+    assert(Utils.byteStringAsMb("3645k") === 3)
+    assert(Utils.byteStringAsMb("1024gb") === 1048576)
+    assert(Utils.byteStringAsMb("1g") === ByteUnit.GiB.toMiB(1))
+    assert(Utils.byteStringAsMb("1t") === ByteUnit.TiB.toMiB(1))
+    assert(Utils.byteStringAsMb("1p") === ByteUnit.PiB.toMiB(1))
+
+    assert(Utils.byteStringAsKb("1") === 1)
+    assert(Utils.byteStringAsKb("1k") === 1)
+    assert(Utils.byteStringAsKb("1m") === ByteUnit.MiB.toKiB(1))
+    assert(Utils.byteStringAsKb("1g") === ByteUnit.GiB.toKiB(1))
+    assert(Utils.byteStringAsKb("1t") === ByteUnit.TiB.toKiB(1))
+    assert(Utils.byteStringAsKb("1p") === ByteUnit.PiB.toKiB(1))
+    
+    assert(Utils.byteStringAsBytes("1") === 1)
+    assert(Utils.byteStringAsBytes("1k") === ByteUnit.KiB.toBytes(1))
+    assert(Utils.byteStringAsBytes("1m") === ByteUnit.MiB.toBytes(1))
+    assert(Utils.byteStringAsBytes("1g") === ByteUnit.GiB.toBytes(1))
+    assert(Utils.byteStringAsBytes("1t") === ByteUnit.TiB.toBytes(1))
+    assert(Utils.byteStringAsBytes("1p") === ByteUnit.PiB.toBytes(1))
+
+    // Overflow handling, 1073741824p exceeds Long.MAX_VALUE if converted straight to Bytes
+    // This demonstrates that we can have e.g 1024^3 PB without overflowing. 
+    assert(Utils.byteStringAsGb("1073741824p") === ByteUnit.PiB.toGiB(1073741824))
+    assert(Utils.byteStringAsMb("1073741824p") === ByteUnit.PiB.toMiB(1073741824))
+    
+    // Run this to confirm it doesn't throw an exception
+    assert(Utils.byteStringAsBytes("9223372036854775807") === 9223372036854775807L) 
+    assert(ByteUnit.PiB.toPiB(9223372036854775807L) === 9223372036854775807L)
+    
+    // Test overflow exception
+    intercept[IllegalArgumentException] {
+      // This value exceeds Long.MAX when converted to bytes 
+      Utils.byteStringAsBytes("9223372036854775808")
+    }
+
+    // Test overflow exception
+    intercept[IllegalArgumentException] {
+      // This value exceeds Long.MAX when converted to TB
+      ByteUnit.PiB.toTiB(9223372036854775807L)
+    }
+    
+    // Test fractional string
+    intercept[NumberFormatException] {
+      Utils.byteStringAsMb("0.064")
+    }
+    
+    // Test fractional string
+    intercept[NumberFormatException] {
+      Utils.byteStringAsMb("0.064m")
+    }
+    
+    // Test invalid strings
+    intercept[NumberFormatException] {
+      Utils.byteStringAsBytes("500ub")
+    }
+    
+    // Test invalid strings
+    intercept[NumberFormatException] {
+      Utils.byteStringAsBytes("This breaks 600b")
+    }
+
+    intercept[NumberFormatException] {
+      Utils.byteStringAsBytes("This breaks 600")
+    }
+
+    intercept[NumberFormatException] {
+      Utils.byteStringAsBytes("600gb This breaks")
+    }
+    
+    intercept[NumberFormatException] {
+      Utils.byteStringAsBytes("This 123mb breaks")
+    }
+  }
+  
   test("bytesToString") {
     assert(Utils.bytesToString(10) === "10.0 B")
     assert(Utils.bytesToString(1500) === "1500.0 B")

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index d587b91..72105fe 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -48,6 +48,17 @@ The following format is accepted:
     5d (days)
     1y (years)
     
+    
+Properties that specify a byte size should be configured with a unit of size.  
+The following format is accepted:
+
+    1b (bytes)
+    1k or 1kb (kibibytes = 1024 bytes)
+    1m or 1mb (mebibytes = 1024 kibibytes)
+    1g or 1gb (gibibytes = 1024 mebibytes)
+    1t or 1tb (tebibytes = 1024 gibibytes)
+    1p or 1pb (pebibytes = 1024 tebibytes)
+
 ## Dynamically Loading Spark Properties
 In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For
 instance, if you'd like to run the same application with different masters or different
@@ -272,12 +283,11 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.executor.logs.rolling.size.maxBytes</code></td>
+  <td><code>spark.executor.logs.rolling.maxSize</code></td>
   <td>(none)</td>
   <td>
     Set the max size of the file by which the executor logs will be rolled over.
-    Rolling is disabled by default. Value is set in terms of bytes.
-    See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+    Rolling is disabled by default. See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
     for automatic cleaning of old logs.
   </td>
 </tr>
@@ -366,10 +376,10 @@ Apart from these, the following properties are also available, and may be useful
 <table class="table">
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
 <tr>
-  <td><code>spark.reducer.maxMbInFlight</code></td>
-  <td>48</td>
+  <td><code>spark.reducer.maxSizeInFlight</code></td>
+  <td>48m</td>
   <td>
-    Maximum size (in megabytes) of map outputs to fetch simultaneously from each reduce task. Since
+    Maximum size of map outputs to fetch simultaneously from each reduce task. Since
     each output requires us to create a buffer to receive it, this represents a fixed memory
     overhead per reduce task, so keep it small unless you have a large amount of memory.
   </td>
@@ -403,10 +413,10 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.shuffle.file.buffer.kb</code></td>
-  <td>32</td>
+  <td><code>spark.shuffle.file.buffer</code></td>
+  <td>32k</td>
   <td>
-    Size of the in-memory buffer for each shuffle file output stream, in kilobytes. These buffers
+    Size of the in-memory buffer for each shuffle file output stream. These buffers
     reduce the number of disk seeks and system calls made in creating intermediate shuffle files.
   </td>
 </tr>
@@ -582,18 +592,18 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.io.compression.lz4.block.size</code></td>
-  <td>32768</td>
+  <td><code>spark.io.compression.lz4.blockSize</code></td>
+  <td>32k</td>
   <td>
-    Block size (in bytes) used in LZ4 compression, in the case when LZ4 compression codec
+    Block size used in LZ4 compression, in the case when LZ4 compression codec
     is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used.
   </td>
 </tr>
 <tr>
-  <td><code>spark.io.compression.snappy.block.size</code></td>
-  <td>32768</td>
+  <td><code>spark.io.compression.snappy.blockSize</code></td>
+  <td>32k</td>
   <td>
-    Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec
+    Block size used in Snappy compression, in the case when Snappy compression codec
     is used. Lowering this block size will also lower shuffle memory usage when Snappy is used.
   </td>
 </tr>
@@ -641,19 +651,19 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.kryoserializer.buffer.max.mb</code></td>
-  <td>64</td>
+  <td><code>spark.kryoserializer.buffer.max</code></td>
+  <td>64m</td>
   <td>
-    Maximum allowable size of Kryo serialization buffer, in megabytes. This must be larger than any
+    Maximum allowable size of Kryo serialization buffer. This must be larger than any
     object you attempt to serialize. Increase this if you get a "buffer limit exceeded" exception
     inside Kryo.
   </td>
 </tr>
 <tr>
-  <td><code>spark.kryoserializer.buffer.mb</code></td>
-  <td>0.064</td>
+  <td><code>spark.kryoserializer.buffer</code></td>
+  <td>64k</td>
   <td>
-    Initial size of Kryo's serialization buffer, in megabytes. Note that there will be one buffer
+    Initial size of Kryo's serialization buffer. Note that there will be one buffer
      <i>per core</i> on each worker. This buffer will grow up to
      <code>spark.kryoserializer.buffer.max.mb</code> if needed.
   </td>
@@ -698,9 +708,9 @@ Apart from these, the following properties are also available, and may be useful
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
 <tr>
   <td><code>spark.broadcast.blockSize</code></td>
-  <td>4096</td>
+  <td>4m</td>
   <td>
-    Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>.
+    Size of each piece of a block for <code>TorrentBroadcastFactory</code>.
     Too large a value decreases parallelism during broadcast (makes it slower); however, if it is
     too small, <code>BlockManager</code> might take a performance hit.
   </td>
@@ -816,9 +826,9 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.storage.memoryMapThreshold</code></td>
-  <td>2097152</td>
+  <td>2m</td>
   <td>
-    Size of a block, in bytes, above which Spark memory maps when reading a block from disk.
+    Size of a block above which Spark memory maps when reading a block from disk.
     This prevents Spark from memory mapping very small blocks. In general, memory
     mapping has high overhead for blocks close to or below the page size of the operating system.
   </td>

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/docs/tuning.md
----------------------------------------------------------------------
diff --git a/docs/tuning.md b/docs/tuning.md
index cbd2278..1cb223e 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -60,7 +60,7 @@ val sc = new SparkContext(conf)
 The [Kryo documentation](https://github.com/EsotericSoftware/kryo) describes more advanced
 registration options, such as adding custom serialization code.
 
-If your objects are large, you may also need to increase the `spark.kryoserializer.buffer.mb`
+If your objects are large, you may also need to increase the `spark.kryoserializer.buffer`
 config property. The default is 2, but this value needs to be large enough to hold the *largest*
 object you will serialize.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
index 0bc36ea..99588b0 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
@@ -100,7 +100,7 @@ object MovieLensALS {
     val conf = new SparkConf().setAppName(s"MovieLensALS with $params")
     if (params.kryo) {
       conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating]))
-        .set("spark.kryoserializer.buffer.mb", "8")
+        .set("spark.kryoserializer.buffer", "8m")
     }
     val sc = new SparkContext(conf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java b/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java
new file mode 100644
index 0000000..36d6550
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.util;
+
+public enum ByteUnit {
+  BYTE (1),
+  KiB (1024L),
+  MiB ((long) Math.pow(1024L, 2L)),
+  GiB ((long) Math.pow(1024L, 3L)),
+  TiB ((long) Math.pow(1024L, 4L)),
+  PiB ((long) Math.pow(1024L, 5L));
+
+  private ByteUnit(long multiplier) {
+    this.multiplier = multiplier;
+  }
+
+  // Interpret the provided number (d) with suffix (u) as this unit type.
+  // E.g. KiB.interpret(1, MiB) interprets 1MiB as its KiB representation = 1024k
+  public long convertFrom(long d, ByteUnit u) {
+    return u.convertTo(d, this);
+  }
+  
+  // Convert the provided number (d) interpreted as this unit type to unit type (u). 
+  public long convertTo(long d, ByteUnit u) {
+    if (multiplier > u.multiplier) {
+      long ratio = multiplier / u.multiplier;
+      if (Long.MAX_VALUE / ratio < d) {
+        throw new IllegalArgumentException("Conversion of " + d + " exceeds Long.MAX_VALUE in "
+          + name() + ". Try a larger unit (e.g. MiB instead of KiB)");
+      }
+      return d * ratio;
+    } else {
+      // Perform operations in this order to avoid potential overflow 
+      // when computing d * multiplier
+      return d / (u.multiplier / multiplier);
+    }
+  }
+
+  public double toBytes(long d) {
+    if (d < 0) {
+      throw new IllegalArgumentException("Negative size value. Size must be positive: " + d);
+    }
+    return d * multiplier; 
+  }
+  
+  public long toKiB(long d) { return convertTo(d, KiB); }
+  public long toMiB(long d) { return convertTo(d, MiB); }
+  public long toGiB(long d) { return convertTo(d, GiB); }
+  public long toTiB(long d) { return convertTo(d, TiB); }
+  public long toPiB(long d) { return convertTo(d, PiB); }
+  
+  private final long multiplier;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2d222fb3/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index b6fbace..6b514aa 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -126,7 +126,7 @@ public class JavaUtils {
     return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
   }
 
-  private static ImmutableMap<String, TimeUnit> timeSuffixes = 
+  private static final ImmutableMap<String, TimeUnit> timeSuffixes = 
     ImmutableMap.<String, TimeUnit>builder()
       .put("us", TimeUnit.MICROSECONDS)
       .put("ms", TimeUnit.MILLISECONDS)
@@ -137,6 +137,21 @@ public class JavaUtils {
       .put("d", TimeUnit.DAYS)
       .build();
 
+  private static final ImmutableMap<String, ByteUnit> byteSuffixes =
+    ImmutableMap.<String, ByteUnit>builder()
+      .put("b", ByteUnit.BYTE)
+      .put("k", ByteUnit.KiB)
+      .put("kb", ByteUnit.KiB)
+      .put("m", ByteUnit.MiB)
+      .put("mb", ByteUnit.MiB)
+      .put("g", ByteUnit.GiB)
+      .put("gb", ByteUnit.GiB)
+      .put("t", ByteUnit.TiB)
+      .put("tb", ByteUnit.TiB)
+      .put("p", ByteUnit.PiB)
+      .put("pb", ByteUnit.PiB)
+      .build();
+
   /**
    * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count for
    * internal use. If no suffix is provided a direct conversion is attempted.
@@ -145,16 +160,14 @@ public class JavaUtils {
     String lower = str.toLowerCase().trim();
     
     try {
-      String suffix;
-      long val;
       Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
-      if (m.matches()) {
-        val = Long.parseLong(m.group(1));
-        suffix = m.group(2);
-      } else {
+      if (!m.matches()) {
         throw new NumberFormatException("Failed to parse time string: " + str);
       }
       
+      long val = Long.parseLong(m.group(1));
+      String suffix = m.group(2);
+      
       // Check for invalid suffixes
       if (suffix != null && !timeSuffixes.containsKey(suffix)) {
         throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
@@ -164,7 +177,7 @@ public class JavaUtils {
       return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit);
     } catch (NumberFormatException e) {
       String timeError = "Time must be specified as seconds (s), " +
-              "milliseconds (ms), microseconds (us), minutes (m or min) hour (h), or day (d). " +
+              "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). " +
               "E.g. 50s, 100ms, or 250us.";
       
       throw new NumberFormatException(timeError + "\n" + e.getMessage());
@@ -186,5 +199,83 @@ public class JavaUtils {
   public static long timeStringAsSec(String str) {
     return parseTimeString(str, TimeUnit.SECONDS);
   }
+  
+  /**
+   * Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to a ByteUnit for
+   * internal use. If no suffix is provided a direct conversion of the provided default is 
+   * attempted.
+   */
+  private static long parseByteString(String str, ByteUnit unit) {
+    String lower = str.toLowerCase().trim();
+
+    try {
+      Matcher m = Pattern.compile("([0-9]+)([a-z]+)?").matcher(lower);
+      Matcher fractionMatcher = Pattern.compile("([0-9]+\\.[0-9]+)([a-z]+)?").matcher(lower);
+      
+      if (m.matches()) {
+        long val = Long.parseLong(m.group(1));
+        String suffix = m.group(2);
+
+        // Check for invalid suffixes
+        if (suffix != null && !byteSuffixes.containsKey(suffix)) {
+          throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
+        }
+
+        // If suffix is valid use that, otherwise none was provided and use the default passed
+        return unit.convertFrom(val, suffix != null ? byteSuffixes.get(suffix) : unit);  
+      } else if (fractionMatcher.matches()) {
+        throw new NumberFormatException("Fractional values are not supported. Input was: " 
+          + fractionMatcher.group(1));
+      } else {
+        throw new NumberFormatException("Failed to parse byte string: " + str);  
+      }
+      
+    } catch (NumberFormatException e) {
+      String timeError = "Size must be specified as bytes (b), " +
+        "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or pebibytes(p). " +
+        "E.g. 50b, 100k, or 250m.";
 
+      throw new NumberFormatException(timeError + "\n" + e.getMessage());
+    }
+  }
+
+  /**
+   * Convert a passed byte string (e.g. 50b, 100k, or 250m) to bytes for
+   * internal use.
+   * 
+   * If no suffix is provided, the passed number is assumed to be in bytes.
+   */
+  public static long byteStringAsBytes(String str) {
+    return parseByteString(str, ByteUnit.BYTE);
+  }
+
+  /**
+   * Convert a passed byte string (e.g. 50b, 100k, or 250m) to kibibytes for
+   * internal use.
+   *
+   * If no suffix is provided, the passed number is assumed to be in kibibytes.
+   */
+  public static long byteStringAsKb(String str) {
+    return parseByteString(str, ByteUnit.KiB);
+  }
+  
+  /**
+   * Convert a passed byte string (e.g. 50b, 100k, or 250m) to mebibytes for
+   * internal use.
+   *
+   * If no suffix is provided, the passed number is assumed to be in mebibytes.
+   */
+  public static long byteStringAsMb(String str) {
+    return parseByteString(str, ByteUnit.MiB);
+  }
+
+  /**
+   * Convert a passed byte string (e.g. 50b, 100k, or 250m) to gibibytes for
+   * internal use.
+   *
+   * If no suffix is provided, the passed number is assumed to be in gibibytes.
+   */
+  public static long byteStringAsGb(String str) {
+    return parseByteString(str, ByteUnit.GiB);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org