You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/09/26 18:27:56 UTC

git commit: [SPARK-3478] [PySpark] Profile the Python tasks

Repository: spark
Updated Branches:
  refs/heads/master b235e0136 -> 1aa549ba9


[SPARK-3478] [PySpark] Profile the Python tasks

This patch add profiling support for PySpark, it will show the profiling results
before the driver exits, here is one example:

```
============================================================
Profile of RDD<id=3>
============================================================
         5146507 function calls (5146487 primitive calls) in 71.094 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
  5144576   68.331    0.000   68.331    0.000 statcounter.py:44(merge)
       20    2.735    0.137   71.071    3.554 statcounter.py:33(__init__)
       20    0.017    0.001    0.017    0.001 {cPickle.dumps}
     1024    0.003    0.000    0.003    0.000 t.py:16(<lambda>)
       20    0.001    0.000    0.001    0.000 {reduce}
       21    0.001    0.000    0.001    0.000 {cPickle.loads}
       20    0.001    0.000    0.001    0.000 copy_reg.py:95(_slotnames)
       41    0.001    0.000    0.001    0.000 serializers.py:461(read_int)
       40    0.001    0.000    0.002    0.000 serializers.py:179(_batched)
       62    0.000    0.000    0.000    0.000 {method 'read' of 'file' objects}
       20    0.000    0.000   71.072    3.554 rdd.py:863(<lambda>)
       20    0.000    0.000    0.001    0.000 serializers.py:198(load_stream)
    40/20    0.000    0.000   71.072    3.554 rdd.py:2093(pipeline_func)
       41    0.000    0.000    0.002    0.000 serializers.py:130(load_stream)
       40    0.000    0.000   71.072    1.777 rdd.py:304(func)
       20    0.000    0.000   71.094    3.555 worker.py:82(process)
```

Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
by `sc.dump_profiles(path)`, such as

```python
>>> sc._conf.set("spark.python.profile", "true")
>>> rdd = sc.parallelize(range(100)).map(str)
>>> rdd.count()
100
>>> sc.show_profiles()
============================================================
Profile of RDD<id=1>
============================================================
         284 function calls (276 primitive calls) in 0.001 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
        4    0.000    0.000    0.000    0.000 {reduce}
     12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
        4    0.000    0.000    0.000    0.000 {cPickle.loads}
        4    0.000    0.000    0.000    0.000 {cPickle.dumps}
      104    0.000    0.000    0.000    0.000 rdd.py:852(<genexpr>)
        8    0.000    0.000    0.000    0.000 serializers.py:461(read_int)
       12    0.000    0.000    0.000    0.000 rdd.py:303(func)
```
The profiling is disabled by default, can be enabled by "spark.python.profile=true".

Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"

Author: Davies Liu <da...@gmail.com>

Closes #2351 from davies/profiler and squashes the following commits:

7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
2b0daf2 [Davies Liu] fix docs
7a56c24 [Davies Liu] bugfix
cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
09d02c3 [Davies Liu] Merge branch 'master' into profiler
c23865c [Davies Liu] Merge branch 'master' into profiler
15d6f18 [Davies Liu] add docs for two configs
dadee1a [Davies Liu] add docs string and clear profiles after show or dump
4f8309d [Davies Liu] address comment, add tests
0a5b6eb [Davies Liu] fix Python UDF
4b20494 [Davies Liu] add profile for python


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1aa549ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1aa549ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1aa549ba

Branch: refs/heads/master
Commit: 1aa549ba9839565274a12c52fa1075b424f138a6
Parents: b235e01
Author: Davies Liu <da...@gmail.com>
Authored: Fri Sep 26 09:27:42 2014 -0700
Committer: Josh Rosen <jo...@apache.org>
Committed: Fri Sep 26 09:27:42 2014 -0700

----------------------------------------------------------------------
 docs/configuration.md          | 19 ++++++++++++++++++
 python/pyspark/accumulators.py | 15 ++++++++++++++
 python/pyspark/context.py      | 39 ++++++++++++++++++++++++++++++++++++-
 python/pyspark/rdd.py          | 10 ++++++++--
 python/pyspark/sql.py          |  2 +-
 python/pyspark/tests.py        | 30 ++++++++++++++++++++++++++++
 python/pyspark/worker.py       | 19 +++++++++++++++---
 7 files changed, 127 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1aa549ba/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index a6dd724..791b6f2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -207,6 +207,25 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.python.profile</code></td>
+  <td>false</td>
+  <td>
+    Enable profiling in Python worker, the profile result will show up by `sc.show_profiles()`,
+    or it will be displayed before the driver exiting. It also can be dumped into disk by
+    `sc.dump_profiles(path)`. If some of the profile results had been displayed maually,
+    they will not be displayed automatically before driver exiting.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.python.profile.dump</code></td>
+  <td>(none)</td>
+  <td>
+    The directory which is used to dump the profile result before driver exiting. 
+    The results will be dumped as separated file for each RDD. They can be loaded
+    by ptats.Stats(). If this is specified, the profile result will not be displayed
+    automatically.
+</tr>
+<tr>
   <td><code>spark.python.worker.reuse</code></td>
   <td>true</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/1aa549ba/python/pyspark/accumulators.py
----------------------------------------------------------------------
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index ccbca67..b8cdbbe 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -215,6 +215,21 @@ FLOAT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0)
 COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)
 
 
+class PStatsParam(AccumulatorParam):
+    """PStatsParam is used to merge pstats.Stats"""
+
+    @staticmethod
+    def zero(value):
+        return None
+
+    @staticmethod
+    def addInPlace(value1, value2):
+        if value1 is None:
+            return value2
+        value1.add(value2)
+        return value1
+
+
 class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
 
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/1aa549ba/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 8e7b004..abeda19 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -20,6 +20,7 @@ import shutil
 import sys
 from threading import Lock
 from tempfile import NamedTemporaryFile
+import atexit
 
 from pyspark import accumulators
 from pyspark.accumulators import Accumulator
@@ -30,7 +31,6 @@ from pyspark.java_gateway import launch_gateway
 from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
     PairDeserializer, CompressedSerializer
 from pyspark.storagelevel import StorageLevel
-from pyspark import rdd
 from pyspark.rdd import RDD
 from pyspark.traceback_utils import CallSite, first_spark_call
 
@@ -192,6 +192,9 @@ class SparkContext(object):
         self._temp_dir = \
             self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
 
+        # profiling stats collected for each PythonRDD
+        self._profile_stats = []
+
     def _initialize_context(self, jconf):
         """
         Initialize SparkContext in function to allow subclass specific initialization
@@ -792,6 +795,40 @@ class SparkContext(object):
         it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
         return list(mappedRDD._collect_iterator_through_file(it))
 
+    def _add_profile(self, id, profileAcc):
+        if not self._profile_stats:
+            dump_path = self._conf.get("spark.python.profile.dump")
+            if dump_path:
+                atexit.register(self.dump_profiles, dump_path)
+            else:
+                atexit.register(self.show_profiles)
+
+        self._profile_stats.append([id, profileAcc, False])
+
+    def show_profiles(self):
+        """ Print the profile stats to stdout """
+        for i, (id, acc, showed) in enumerate(self._profile_stats):
+            stats = acc.value
+            if not showed and stats:
+                print "=" * 60
+                print "Profile of RDD<id=%d>" % id
+                print "=" * 60
+                stats.sort_stats("tottime", "cumtime").print_stats()
+                # mark it as showed
+                self._profile_stats[i][2] = True
+
+    def dump_profiles(self, path):
+        """ Dump the profile stats into directory `path`
+        """
+        if not os.path.exists(path):
+            os.makedirs(path)
+        for id, acc, _ in self._profile_stats:
+            stats = acc.value
+            if stats:
+                p = os.path.join(path, "rdd_%d.pstats" % id)
+                stats.dump_stats(p)
+        self._profile_stats = []
+
 
 def _test():
     import atexit

http://git-wip-us.apache.org/repos/asf/spark/blob/1aa549ba/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 680140d..8ed89e2 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -15,7 +15,6 @@
 # limitations under the License.
 #
 
-from base64 import standard_b64encode as b64enc
 import copy
 from collections import defaultdict
 from itertools import chain, ifilter, imap
@@ -32,6 +31,7 @@ import bisect
 from random import Random
 from math import sqrt, log, isinf, isnan
 
+from pyspark.accumulators import PStatsParam
 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
     BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
     PickleSerializer, pack_long, AutoBatchedSerializer
@@ -2080,7 +2080,9 @@ class PipelinedRDD(RDD):
             return self._jrdd_val
         if self._bypass_serializer:
             self._jrdd_deserializer = NoOpSerializer()
-        command = (self.func, self._prev_jrdd_deserializer,
+        enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true"
+        profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None
+        command = (self.func, profileStats, self._prev_jrdd_deserializer,
                    self._jrdd_deserializer)
         # the serialized command will be compressed by broadcast
         ser = CloudPickleSerializer()
@@ -2102,6 +2104,10 @@ class PipelinedRDD(RDD):
                                              self.ctx.pythonExec,
                                              broadcast_vars, self.ctx._javaAccumulator)
         self._jrdd_val = python_rdd.asJavaRDD()
+
+        if enable_profile:
+            self._id = self._jrdd_val.id()
+            self.ctx._add_profile(self._id, profileStats)
         return self._jrdd_val
 
     def id(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/1aa549ba/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 653195e..ee5bda8 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -974,7 +974,7 @@ class SQLContext(object):
         [Row(c0=4)]
         """
         func = lambda _, it: imap(lambda x: f(*x), it)
-        command = (func,
+        command = (func, None,
                    BatchedSerializer(PickleSerializer(), 1024),
                    BatchedSerializer(PickleSerializer(), 1024))
         ser = CloudPickleSerializer()

http://git-wip-us.apache.org/repos/asf/spark/blob/1aa549ba/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index d1bb203..e6002af 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -632,6 +632,36 @@ class TestRDDFunctions(PySparkTestCase):
         self.assertEquals(result.count(), 3)
 
 
+class TestProfiler(PySparkTestCase):
+
+    def setUp(self):
+        self._old_sys_path = list(sys.path)
+        class_name = self.__class__.__name__
+        conf = SparkConf().set("spark.python.profile", "true")
+        self.sc = SparkContext('local[4]', class_name, batchSize=2, conf=conf)
+
+    def test_profiler(self):
+
+        def heavy_foo(x):
+            for i in range(1 << 20):
+                x = 1
+        rdd = self.sc.parallelize(range(100))
+        rdd.foreach(heavy_foo)
+        profiles = self.sc._profile_stats
+        self.assertEqual(1, len(profiles))
+        id, acc, _ = profiles[0]
+        stats = acc.value
+        self.assertTrue(stats is not None)
+        width, stat_list = stats.get_print_list([])
+        func_names = [func_name for fname, n, func_name in stat_list]
+        self.assertTrue("heavy_foo" in func_names)
+
+        self.sc.show_profiles()
+        d = tempfile.gettempdir()
+        self.sc.dump_profiles(d)
+        self.assertTrue("rdd_%d.pstats" % id in os.listdir(d))
+
+
 class TestSQL(PySparkTestCase):
 
     def setUp(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/1aa549ba/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index c1f6e3e..8257ddd 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -23,6 +23,8 @@ import sys
 import time
 import socket
 import traceback
+import cProfile
+import pstats
 
 from pyspark.accumulators import _accumulatorRegistry
 from pyspark.broadcast import Broadcast, _broadcastRegistry
@@ -90,10 +92,21 @@ def main(infile, outfile):
         command = pickleSer._read_with_length(infile)
         if isinstance(command, Broadcast):
             command = pickleSer.loads(command.value)
-        (func, deserializer, serializer) = command
+        (func, stats, deserializer, serializer) = command
         init_time = time.time()
-        iterator = deserializer.load_stream(infile)
-        serializer.dump_stream(func(split_index, iterator), outfile)
+
+        def process():
+            iterator = deserializer.load_stream(infile)
+            serializer.dump_stream(func(split_index, iterator), outfile)
+
+        if stats:
+            p = cProfile.Profile()
+            p.runcall(process)
+            st = pstats.Stats(p)
+            st.stream = None  # make it picklable
+            stats.add(st.strip_dirs())
+        else:
+            process()
     except Exception:
         try:
             write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org