You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/02/21 13:14:49 UTC

[beam] branch revert-10885-synthetic-source-perf-fix created (now 683d03f)

This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a change to branch revert-10885-synthetic-source-perf-fix
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 683d03f  Revert "[BEAM-9085] Fix performance regression in SyntheticSource (#10885)"

This branch includes the following new commits:

     new 683d03f  Revert "[BEAM-9085] Fix performance regression in SyntheticSource (#10885)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "[BEAM-9085] Fix performance regression in SyntheticSource (#10885)"

Posted by mw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a commit to branch revert-10885-synthetic-source-perf-fix
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 683d03f384789b141445c5c8ca172853e320a26a
Author: Michal Walenia <32...@users.noreply.github.com>
AuthorDate: Fri Feb 21 14:14:34 2020 +0100

    Revert "[BEAM-9085] Fix performance regression in SyntheticSource (#10885)"
    
    This reverts commit b4668a1a1e234c071c3a7b182a76f8f4cf6bfe64.
---
 .../apache_beam/testing/synthetic_pipeline.py      | 25 +++++++---------------
 1 file changed, 8 insertions(+), 17 deletions(-)

diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline.py b/sdks/python/apache_beam/testing/synthetic_pipeline.py
index af21e6e..6dd41c8 100644
--- a/sdks/python/apache_beam/testing/synthetic_pipeline.py
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline.py
@@ -41,10 +41,7 @@ import argparse
 import json
 import logging
 import math
-import random
-import struct
 import time
-from builtins import range
 
 import apache_beam as beam
 from apache_beam.io import WriteToText
@@ -418,31 +415,25 @@ class SyntheticSource(iobase.BoundedSource):
       tracker = range_trackers.UnsplittableRangeTracker(tracker)
     return tracker
 
-  @staticmethod
-  def random_bytes(length, generator):
-    """Return random bytes."""
-    return b''.join(
-        (struct.pack('B', generator.getrandbits(8)) for _ in range(length)))
-
-  def _gen_kv_pair(self, generator, index):
-    generator.seed(index)
-    rand = generator.random()
+  def _gen_kv_pair(self, index):
+    r = np.random.RandomState(index)
+    rand = r.random_sample()
 
     # Determines whether to generate hot key or not.
     if rand < self._hot_key_fraction:
       # Generate hot key.
       # An integer is randomly selected from the range [0, numHotKeys-1]
       # with equal probability.
-      generator.seed(index % self._num_hot_keys)
-    return self.random_bytes(self._key_size, generator), self.random_bytes(
-      self._value_size, generator)
+      r_hot = np.random.RandomState(index % self._num_hot_keys)
+      return r_hot.bytes(self._key_size), r.bytes(self._value_size)
+    else:
+      return r.bytes(self._key_size), r.bytes(self._value_size)
 
   def read(self, range_tracker):
     index = range_tracker.start_position()
-    generator = random.Random()
     while range_tracker.try_claim(index):
       time.sleep(self._sleep_per_input_record_sec)
-      yield self._gen_kv_pair(generator, index)
+      yield self._gen_kv_pair(index)
       index += 1
 
   def default_output_coder(self):