You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/04 23:03:38 UTC
git commit: Merge pull request #95 from aarondav/perftest
Updated Branches:
refs/heads/branch-0.8 daaaee175 -> 31da065b1
Merge pull request #95 from aarondav/perftest
Minor: Put StoragePerfTester in org/apache/
(cherry picked from commit a51359c917a9ebe379b32ebc53fd093c454ea195)
Signed-off-by: Reynold Xin <rx...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/31da065b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/31da065b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/31da065b
Branch: refs/heads/branch-0.8
Commit: 31da065b1d08c1fad5283e4bcf8e0ed01818c03e
Parents: daaaee1
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Oct 21 20:33:29 2013 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Dec 4 14:01:13 2013 -0800
----------------------------------------------------------------------
.../spark/storage/StoragePerfTester.scala | 84 ++++++++++++++++++++
.../scala/spark/storage/StoragePerfTester.scala | 84 --------------------
2 files changed, 84 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/31da065b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
new file mode 100644
index 0000000..68893a2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
@@ -0,0 +1,84 @@
+package org.apache.spark.storage
+
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.{CountDownLatch, Executors}
+
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.SparkContext
+import org.apache.spark.util.Utils
+
+/** Utility for micro-benchmarking shuffle write performance.
+ *
+ * Writes simulated shuffle output from several threads and records the observed throughput*/
+object StoragePerfTester {
+ def main(args: Array[String]) = {
+ /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
+ val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
+
+ /** Number of map tasks. All tasks execute concurrently. */
+ val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
+
+ /** Number of reduce splits for each map task. */
+ val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
+
+ val recordLength = 1000 // ~1KB records
+ val totalRecords = dataSizeMb * 1000
+ val recordsPerMap = totalRecords / numMaps
+
+ val writeData = "1" * recordLength
+ val executor = Executors.newFixedThreadPool(numMaps)
+
+ System.setProperty("spark.shuffle.compress", "false")
+ System.setProperty("spark.shuffle.sync", "true")
+
+ // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
+ val sc = new SparkContext("local[4]", "Write Tester")
+ val blockManager = sc.env.blockManager
+
+ def writeOutputBytes(mapId: Int, total: AtomicLong) = {
+ val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
+ new KryoSerializer())
+ val writers = shuffle.writers
+ for (i <- 1 to recordsPerMap) {
+ writers(i % numOutputSplits).write(writeData)
+ }
+ writers.map {w =>
+ w.commit()
+ total.addAndGet(w.fileSegment().length)
+ w.close()
+ }
+
+ shuffle.releaseWriters(true)
+ }
+
+ val start = System.currentTimeMillis()
+ val latch = new CountDownLatch(numMaps)
+ val totalBytes = new AtomicLong()
+ for (task <- 1 to numMaps) {
+ executor.submit(new Runnable() {
+ override def run() = {
+ try {
+ writeOutputBytes(task, totalBytes)
+ latch.countDown()
+ } catch {
+ case e: Exception =>
+ println("Exception in child thread: " + e + " " + e.getMessage)
+ System.exit(1)
+ }
+ }
+ })
+ }
+ latch.await()
+ val end = System.currentTimeMillis()
+ val time = (end - start) / 1000.0
+ val bytesPerSecond = totalBytes.get() / time
+ val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
+
+ System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
+ System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
+ System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
+
+ executor.shutdown()
+ sc.stop()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/31da065b/core/src/main/scala/spark/storage/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/StoragePerfTester.scala b/core/src/main/scala/spark/storage/StoragePerfTester.scala
deleted file mode 100644
index 68893a2..0000000
--- a/core/src/main/scala/spark/storage/StoragePerfTester.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.spark.storage
-
-import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.{CountDownLatch, Executors}
-
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.SparkContext
-import org.apache.spark.util.Utils
-
-/** Utility for micro-benchmarking shuffle write performance.
- *
- * Writes simulated shuffle output from several threads and records the observed throughput*/
-object StoragePerfTester {
- def main(args: Array[String]) = {
- /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
- val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
-
- /** Number of map tasks. All tasks execute concurrently. */
- val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
-
- /** Number of reduce splits for each map task. */
- val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
-
- val recordLength = 1000 // ~1KB records
- val totalRecords = dataSizeMb * 1000
- val recordsPerMap = totalRecords / numMaps
-
- val writeData = "1" * recordLength
- val executor = Executors.newFixedThreadPool(numMaps)
-
- System.setProperty("spark.shuffle.compress", "false")
- System.setProperty("spark.shuffle.sync", "true")
-
- // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
- val sc = new SparkContext("local[4]", "Write Tester")
- val blockManager = sc.env.blockManager
-
- def writeOutputBytes(mapId: Int, total: AtomicLong) = {
- val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
- new KryoSerializer())
- val writers = shuffle.writers
- for (i <- 1 to recordsPerMap) {
- writers(i % numOutputSplits).write(writeData)
- }
- writers.map {w =>
- w.commit()
- total.addAndGet(w.fileSegment().length)
- w.close()
- }
-
- shuffle.releaseWriters(true)
- }
-
- val start = System.currentTimeMillis()
- val latch = new CountDownLatch(numMaps)
- val totalBytes = new AtomicLong()
- for (task <- 1 to numMaps) {
- executor.submit(new Runnable() {
- override def run() = {
- try {
- writeOutputBytes(task, totalBytes)
- latch.countDown()
- } catch {
- case e: Exception =>
- println("Exception in child thread: " + e + " " + e.getMessage)
- System.exit(1)
- }
- }
- })
- }
- latch.await()
- val end = System.currentTimeMillis()
- val time = (end - start) / 1000.0
- val bytesPerSecond = totalBytes.get() / time
- val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
-
- System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
- System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
- System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
-
- executor.shutdown()
- sc.stop()
- }
-}