You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/02/21 13:14:50 UTC
[beam] 01/01: Revert "[BEAM-9085] Fix performance regression in
SyntheticSource (#10885)"
This is an automated email from the ASF dual-hosted git repository.
mwalenia pushed a commit to branch revert-10885-synthetic-source-perf-fix
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 683d03f384789b141445c5c8ca172853e320a26a
Author: Michal Walenia <32...@users.noreply.github.com>
AuthorDate: Fri Feb 21 14:14:34 2020 +0100
Revert "[BEAM-9085] Fix performance regression in SyntheticSource (#10885)"
This reverts commit b4668a1a1e234c071c3a7b182a76f8f4cf6bfe64.
---
.../apache_beam/testing/synthetic_pipeline.py | 25 +++++++---------------
1 file changed, 8 insertions(+), 17 deletions(-)
diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline.py b/sdks/python/apache_beam/testing/synthetic_pipeline.py
index af21e6e..6dd41c8 100644
--- a/sdks/python/apache_beam/testing/synthetic_pipeline.py
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline.py
@@ -41,10 +41,7 @@ import argparse
import json
import logging
import math
-import random
-import struct
import time
-from builtins import range
import apache_beam as beam
from apache_beam.io import WriteToText
@@ -418,31 +415,25 @@ class SyntheticSource(iobase.BoundedSource):
tracker = range_trackers.UnsplittableRangeTracker(tracker)
return tracker
- @staticmethod
- def random_bytes(length, generator):
- """Return random bytes."""
- return b''.join(
- (struct.pack('B', generator.getrandbits(8)) for _ in range(length)))
-
- def _gen_kv_pair(self, generator, index):
- generator.seed(index)
- rand = generator.random()
+ def _gen_kv_pair(self, index):
+ r = np.random.RandomState(index)
+ rand = r.random_sample()
# Determines whether to generate hot key or not.
if rand < self._hot_key_fraction:
# Generate hot key.
# An integer is randomly selected from the range [0, numHotKeys-1]
# with equal probability.
- generator.seed(index % self._num_hot_keys)
- return self.random_bytes(self._key_size, generator), self.random_bytes(
- self._value_size, generator)
+ r_hot = np.random.RandomState(index % self._num_hot_keys)
+ return r_hot.bytes(self._key_size), r.bytes(self._value_size)
+ else:
+ return r.bytes(self._key_size), r.bytes(self._value_size)
def read(self, range_tracker):
index = range_tracker.start_position()
- generator = random.Random()
while range_tracker.try_claim(index):
time.sleep(self._sleep_per_input_record_sec)
- yield self._gen_kv_pair(generator, index)
+ yield self._gen_kv_pair(index)
index += 1
def default_output_coder(self):