You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/04 23:03:38 UTC

git commit: Merge pull request #95 from aarondav/perftest

Updated Branches:
  refs/heads/branch-0.8 daaaee175 -> 31da065b1


Merge pull request #95 from aarondav/perftest

Minor: Put StoragePerfTester in org/apache/
(cherry picked from commit a51359c917a9ebe379b32ebc53fd093c454ea195)
Signed-off-by: Reynold Xin <rx...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/31da065b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/31da065b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/31da065b

Branch: refs/heads/branch-0.8
Commit: 31da065b1d08c1fad5283e4bcf8e0ed01818c03e
Parents: daaaee1
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Oct 21 20:33:29 2013 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Dec 4 14:01:13 2013 -0800

----------------------------------------------------------------------
 .../spark/storage/StoragePerfTester.scala       | 84 ++++++++++++++++++++
 .../scala/spark/storage/StoragePerfTester.scala | 84 --------------------
 2 files changed, 84 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/31da065b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
new file mode 100644
index 0000000..68893a2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
@@ -0,0 +1,84 @@
+package org.apache.spark.storage
+
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.{CountDownLatch, Executors}
+
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.SparkContext
+import org.apache.spark.util.Utils
+
+/** Utility for micro-benchmarking shuffle write performance.
+  *
+  * Writes simulated shuffle output from several threads and records the observed throughput*/
+object StoragePerfTester {
+  def main(args: Array[String]) = {
+    /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
+    val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
+
+    /** Number of map tasks. All tasks execute concurrently. */
+    val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
+
+    /** Number of reduce splits for each map task. */
+    val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
+
+    val recordLength = 1000 // ~1KB records
+    val totalRecords = dataSizeMb * 1000
+    val recordsPerMap = totalRecords / numMaps
+
+    val writeData = "1" * recordLength
+    val executor = Executors.newFixedThreadPool(numMaps)
+
+    System.setProperty("spark.shuffle.compress", "false")
+    System.setProperty("spark.shuffle.sync", "true")
+
+    // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
+    val sc = new SparkContext("local[4]", "Write Tester")
+    val blockManager = sc.env.blockManager
+
+    def writeOutputBytes(mapId: Int, total: AtomicLong) = {
+      val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
+        new KryoSerializer())
+      val writers = shuffle.writers
+      for (i <- 1 to recordsPerMap) {
+        writers(i % numOutputSplits).write(writeData)
+      }
+      writers.map {w =>
+        w.commit()
+        total.addAndGet(w.fileSegment().length)
+        w.close()
+      }
+
+      shuffle.releaseWriters(true)
+    }
+
+    val start = System.currentTimeMillis()
+    val latch = new CountDownLatch(numMaps)
+    val totalBytes = new AtomicLong()
+    for (task <- 1 to numMaps) {
+      executor.submit(new Runnable() {
+        override def run() = {
+          try {
+            writeOutputBytes(task, totalBytes)
+            latch.countDown()
+          } catch {
+            case e: Exception =>
+              println("Exception in child thread: " + e + " " + e.getMessage)
+              System.exit(1)
+          }
+        }
+      })
+    }
+    latch.await()
+    val end = System.currentTimeMillis()
+    val time = (end - start) / 1000.0
+    val bytesPerSecond = totalBytes.get() / time
+    val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
+
+    System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
+    System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
+    System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
+
+    executor.shutdown()
+    sc.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/31da065b/core/src/main/scala/spark/storage/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/StoragePerfTester.scala b/core/src/main/scala/spark/storage/StoragePerfTester.scala
deleted file mode 100644
index 68893a2..0000000
--- a/core/src/main/scala/spark/storage/StoragePerfTester.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.spark.storage
-
-import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.{CountDownLatch, Executors}
-
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.SparkContext
-import org.apache.spark.util.Utils
-
-/** Utility for micro-benchmarking shuffle write performance.
-  *
-  * Writes simulated shuffle output from several threads and records the observed throughput*/
-object StoragePerfTester {
-  def main(args: Array[String]) = {
-    /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
-    val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
-
-    /** Number of map tasks. All tasks execute concurrently. */
-    val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
-
-    /** Number of reduce splits for each map task. */
-    val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
-
-    val recordLength = 1000 // ~1KB records
-    val totalRecords = dataSizeMb * 1000
-    val recordsPerMap = totalRecords / numMaps
-
-    val writeData = "1" * recordLength
-    val executor = Executors.newFixedThreadPool(numMaps)
-
-    System.setProperty("spark.shuffle.compress", "false")
-    System.setProperty("spark.shuffle.sync", "true")
-
-    // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
-    val sc = new SparkContext("local[4]", "Write Tester")
-    val blockManager = sc.env.blockManager
-
-    def writeOutputBytes(mapId: Int, total: AtomicLong) = {
-      val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
-        new KryoSerializer())
-      val writers = shuffle.writers
-      for (i <- 1 to recordsPerMap) {
-        writers(i % numOutputSplits).write(writeData)
-      }
-      writers.map {w =>
-        w.commit()
-        total.addAndGet(w.fileSegment().length)
-        w.close()
-      }
-
-      shuffle.releaseWriters(true)
-    }
-
-    val start = System.currentTimeMillis()
-    val latch = new CountDownLatch(numMaps)
-    val totalBytes = new AtomicLong()
-    for (task <- 1 to numMaps) {
-      executor.submit(new Runnable() {
-        override def run() = {
-          try {
-            writeOutputBytes(task, totalBytes)
-            latch.countDown()
-          } catch {
-            case e: Exception =>
-              println("Exception in child thread: " + e + " " + e.getMessage)
-              System.exit(1)
-          }
-        }
-      })
-    }
-    latch.await()
-    val end = System.currentTimeMillis()
-    val time = (end - start) / 1000.0
-    val bytesPerSecond = totalBytes.get() / time
-    val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
-
-    System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
-    System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
-    System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
-
-    executor.shutdown()
-    sc.stop()
-  }
-}