You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/08 01:59:32 UTC

[2/5] incubator-beam git commit: Add a test to check memory consumption of the direct runner.

Add a test to check memory consumption of the direct runner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d021e9ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d021e9ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d021e9ce

Branch: refs/heads/python-sdk
Commit: d021e9ce7bccb770e187c720bd7a95bb99f1bc8e
Parents: 7f201cb
Author: Ahmet Altay <al...@google.com>
Authored: Thu Nov 3 15:37:49 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Nov 7 17:56:49 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline_test.py | 37 +++++++++++++++++++++++++++
 1 file changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d021e9ce/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index db3ad9e..a4c983f 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -180,6 +180,43 @@ class PipelineTest(unittest.TestCase):
         ['a-x', 'b-x', 'c-x'],
         sorted(['a', 'b', 'c'] | 'AddSuffix' >> AddSuffix('-x')))
 
+  def test_memory_usage(self):
+    try:
+      import resource
+    except ImportError:
+      # Skip the test if resource module is not available (e.g. non-Unix os).
+      self.skipTest('resource module not available.')
+
+    def get_memory_usage_in_bytes():
+      return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * (2 ** 10)
+
+    def check_memory(value, memory_threshold):
+      memory_usage = get_memory_usage_in_bytes()
+      if memory_usage > memory_threshold:
+        raise RuntimeError(
+            'High memory usage: %d > %d' % (memory_usage, memory_threshold))
+      return value
+
+    len_elements = 1000000
+    num_elements = 10
+    num_maps = 100
+
+    pipeline = Pipeline('DirectPipelineRunner')
+
+    # Consumed memory should not be proportional to the number of maps.
+    memory_threshold = (
+        get_memory_usage_in_bytes() + (3 * len_elements * num_elements))
+
+    biglist = pipeline | 'oom:create' >> Create(
+        ['x' * len_elements] * num_elements)
+    for i in range(num_maps):
+      biglist = biglist | ('oom:addone-%d' % i) >> Map(lambda x: x + 'y')
+    result = biglist | 'oom:check' >> Map(check_memory, memory_threshold)
+    assert_that(result, equal_to(
+        ['x' * len_elements + 'y' * num_maps] * num_elements))
+
+    pipeline.run()
+
   def test_pipeline_as_context(self):
     def raise_exception(exn):
       raise exn