You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/08 01:59:32 UTC
[2/5] incubator-beam git commit: Add a test to check memory
consumption of the direct runner.
Add a test to check memory consumption of the direct runner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d021e9ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d021e9ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d021e9ce
Branch: refs/heads/python-sdk
Commit: d021e9ce7bccb770e187c720bd7a95bb99f1bc8e
Parents: 7f201cb
Author: Ahmet Altay <al...@google.com>
Authored: Thu Nov 3 15:37:49 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Nov 7 17:56:49 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline_test.py | 37 +++++++++++++++++++++++++++
1 file changed, 37 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d021e9ce/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index db3ad9e..a4c983f 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -180,6 +180,43 @@ class PipelineTest(unittest.TestCase):
['a-x', 'b-x', 'c-x'],
sorted(['a', 'b', 'c'] | 'AddSuffix' >> AddSuffix('-x')))
+ def test_memory_usage(self):
+ try:
+ import resource
+ except ImportError:
+ # Skip the test if resource module is not available (e.g. non-Unix os).
+ self.skipTest('resource module not available.')
+
+ def get_memory_usage_in_bytes():
+ return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * (2 ** 10)
+
+ def check_memory(value, memory_threshold):
+ memory_usage = get_memory_usage_in_bytes()
+ if memory_usage > memory_threshold:
+ raise RuntimeError(
+ 'High memory usage: %d > %d' % (memory_usage, memory_threshold))
+ return value
+
+ len_elements = 1000000
+ num_elements = 10
+ num_maps = 100
+
+ pipeline = Pipeline('DirectPipelineRunner')
+
+ # Consumed memory should not be proportional to the number of maps.
+ memory_threshold = (
+ get_memory_usage_in_bytes() + (3 * len_elements * num_elements))
+
+ biglist = pipeline | 'oom:create' >> Create(
+ ['x' * len_elements] * num_elements)
+ for i in range(num_maps):
+ biglist = biglist | ('oom:addone-%d' % i) >> Map(lambda x: x + 'y')
+ result = biglist | 'oom:check' >> Map(check_memory, memory_threshold)
+ assert_that(result, equal_to(
+ ['x' * len_elements + 'y' * num_maps] * num_elements))
+
+ pipeline.run()
+
def test_pipeline_as_context(self):
def raise_exception(exn):
raise exn