You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/02/21 13:14:49 UTC
[beam] branch revert-10885-synthetic-source-perf-fix created (now
683d03f)
This is an automated email from the ASF dual-hosted git repository.
mwalenia pushed a change to branch revert-10885-synthetic-source-perf-fix
in repository https://gitbox.apache.org/repos/asf/beam.git.
at 683d03f Revert "[BEAM-9085] Fix performance regression in SyntheticSource (#10885)"
This branch includes the following new commits:
new 683d03f Revert "[BEAM-9085] Fix performance regression in SyntheticSource (#10885)"
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[beam] 01/01: Revert "[BEAM-9085] Fix performance regression in
SyntheticSource (#10885)"
Posted by mw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
mwalenia pushed a commit to branch revert-10885-synthetic-source-perf-fix
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 683d03f384789b141445c5c8ca172853e320a26a
Author: Michal Walenia <32...@users.noreply.github.com>
AuthorDate: Fri Feb 21 14:14:34 2020 +0100
Revert "[BEAM-9085] Fix performance regression in SyntheticSource (#10885)"
This reverts commit b4668a1a1e234c071c3a7b182a76f8f4cf6bfe64.
---
.../apache_beam/testing/synthetic_pipeline.py | 25 +++++++---------------
1 file changed, 8 insertions(+), 17 deletions(-)
diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline.py b/sdks/python/apache_beam/testing/synthetic_pipeline.py
index af21e6e..6dd41c8 100644
--- a/sdks/python/apache_beam/testing/synthetic_pipeline.py
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline.py
@@ -41,10 +41,7 @@ import argparse
import json
import logging
import math
-import random
-import struct
import time
-from builtins import range
import apache_beam as beam
from apache_beam.io import WriteToText
@@ -418,31 +415,25 @@ class SyntheticSource(iobase.BoundedSource):
tracker = range_trackers.UnsplittableRangeTracker(tracker)
return tracker
- @staticmethod
- def random_bytes(length, generator):
- """Return random bytes."""
- return b''.join(
- (struct.pack('B', generator.getrandbits(8)) for _ in range(length)))
-
- def _gen_kv_pair(self, generator, index):
- generator.seed(index)
- rand = generator.random()
+ def _gen_kv_pair(self, index):
+ r = np.random.RandomState(index)
+ rand = r.random_sample()
# Determines whether to generate hot key or not.
if rand < self._hot_key_fraction:
# Generate hot key.
# An integer is randomly selected from the range [0, numHotKeys-1]
# with equal probability.
- generator.seed(index % self._num_hot_keys)
- return self.random_bytes(self._key_size, generator), self.random_bytes(
- self._value_size, generator)
+ r_hot = np.random.RandomState(index % self._num_hot_keys)
+ return r_hot.bytes(self._key_size), r.bytes(self._value_size)
+ else:
+ return r.bytes(self._key_size), r.bytes(self._value_size)
def read(self, range_tracker):
index = range_tracker.start_position()
- generator = random.Random()
while range_tracker.try_claim(index):
time.sleep(self._sleep_per_input_record_sec)
- yield self._gen_kv_pair(generator, index)
+ yield self._gen_kv_pair(index)
index += 1
def default_output_coder(self):