You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/02/21 13:14:50 UTC

[beam] 01/01: Revert "[BEAM-9085] Fix performance regression in SyntheticSource (#10885)"

This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a commit to branch revert-10885-synthetic-source-perf-fix
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 683d03f384789b141445c5c8ca172853e320a26a
Author: Michal Walenia <32...@users.noreply.github.com>
AuthorDate: Fri Feb 21 14:14:34 2020 +0100

    Revert "[BEAM-9085] Fix performance regression in SyntheticSource (#10885)"
    
    This reverts commit b4668a1a1e234c071c3a7b182a76f8f4cf6bfe64.
---
 .../apache_beam/testing/synthetic_pipeline.py      | 25 +++++++---------------
 1 file changed, 8 insertions(+), 17 deletions(-)

diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline.py b/sdks/python/apache_beam/testing/synthetic_pipeline.py
index af21e6e..6dd41c8 100644
--- a/sdks/python/apache_beam/testing/synthetic_pipeline.py
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline.py
@@ -41,10 +41,7 @@ import argparse
 import json
 import logging
 import math
-import random
-import struct
 import time
-from builtins import range
 
 import apache_beam as beam
 from apache_beam.io import WriteToText
@@ -418,31 +415,25 @@ class SyntheticSource(iobase.BoundedSource):
       tracker = range_trackers.UnsplittableRangeTracker(tracker)
     return tracker
 
-  @staticmethod
-  def random_bytes(length, generator):
-    """Return random bytes."""
-    return b''.join(
-        (struct.pack('B', generator.getrandbits(8)) for _ in range(length)))
-
-  def _gen_kv_pair(self, generator, index):
-    generator.seed(index)
-    rand = generator.random()
+  def _gen_kv_pair(self, index):
+    r = np.random.RandomState(index)
+    rand = r.random_sample()
 
     # Determines whether to generate hot key or not.
     if rand < self._hot_key_fraction:
       # Generate hot key.
       # An integer is randomly selected from the range [0, numHotKeys-1]
       # with equal probability.
-      generator.seed(index % self._num_hot_keys)
-    return self.random_bytes(self._key_size, generator), self.random_bytes(
-      self._value_size, generator)
+      r_hot = np.random.RandomState(index % self._num_hot_keys)
+      return r_hot.bytes(self._key_size), r.bytes(self._value_size)
+    else:
+      return r.bytes(self._key_size), r.bytes(self._value_size)
 
   def read(self, range_tracker):
     index = range_tracker.start_position()
-    generator = random.Random()
     while range_tracker.try_claim(index):
       time.sleep(self._sleep_per_input_record_sec)
-      yield self._gen_kv_pair(generator, index)
+      yield self._gen_kv_pair(index)
       index += 1
 
   def default_output_coder(self):