You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2018/05/08 19:07:49 UTC
[beam] branch master updated: Adding a microbenchmark for side
input iterables. (#5294)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 08c91f6 Adding a microbenchmark for side input iterables. (#5294)
08c91f6 is described below
commit 08c91f6e80cae7f322a243e547f971743563fac2
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Tue May 8 12:07:42 2018 -0700
Adding a microbenchmark for side input iterables. (#5294)
* Adding a microbenchmark for side input iterables.
---
.../apache_beam/tools/sideinput_microbenchmark.py | 75 ++++++++++++++++++++++
1 file changed, 75 insertions(+)
diff --git a/sdks/python/apache_beam/tools/sideinput_microbenchmark.py b/sdks/python/apache_beam/tools/sideinput_microbenchmark.py
new file mode 100644
index 0000000..15e1b9b
--- /dev/null
+++ b/sdks/python/apache_beam/tools/sideinput_microbenchmark.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""A microbenchmark for measuring performance of Side Inputs iterable.
+
+Runs side input iterable that fetches from multiple FakeSources, and
+measures the time to fetch all elements from all sources.
+
+Run as
+ python -m apache_beam.tools.sideinput_microbenchmark
+"""
+
+from __future__ import print_function
+
+import time
+
+from apache_beam.runners.worker import opcounters
+from apache_beam.runners.worker import sideinputs
+from apache_beam.runners.worker import statesampler
+from apache_beam.runners.worker.sideinputs_test import FakeSource
+from apache_beam.tools import utils
+from apache_beam.utils.counters import CounterFactory
+
+
+def long_generator(value, elements):
+ for _ in range(elements):
+ yield value
+
+
+def run_benchmark(num_runs=50, input_per_source=4000, num_sources=4):
+ print("Number of runs:", num_runs)
+ print("Input size:", num_sources * input_per_source)
+ print("Sources:", num_sources)
+
+ times = []
+ for i in range(num_runs):
+ counter_factory = CounterFactory()
+ state_sampler = statesampler.StateSampler('basic', counter_factory)
+ with state_sampler.scoped_state('step1', 'state'):
+ si_counter = opcounters.SideInputReadCounter(
+ counter_factory, state_sampler, 'step1', 1)
+ si_counter = opcounters.NoOpTransformIOCounter()
+ sources = [
+ FakeSource(long_generator(i, input_per_source))
+ for i in range(num_sources)]
+ iterator_fn = sideinputs.get_iterator_fn_for_sources(
+ sources, read_counter=si_counter)
+ start = time.time()
+ list(iterator_fn())
+ time_cost = time.time() - start
+ times.append(time_cost)
+
+ print("Runtimes:", times)
+
+ avg_runtime = sum(times)/len(times)
+ print("Average runtime:", avg_runtime)
+ print("Time per element:", avg_runtime/(input_per_source * num_sources))
+
+
+if __name__ == '__main__':
+ utils.check_compiled(
+ 'apache_beam.runners.worker.opcounters')
+ run_benchmark()
--
To stop receiving notification emails like this one, please contact
pabloem@apache.org.