You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/27 19:26:29 UTC

[GitHub] [beam] pabloem commented on a change in pull request #15753: [BEAM-13080] Add option in Reshuffle to set num of keys generated.

pabloem commented on a change in pull request #15753:
URL: https://github.com/apache/beam/pull/15753#discussion_r737778631



##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -723,12 +723,33 @@ class Reshuffle(PTransform):
 
   Reshuffle is experimental. No backwards compatibility guarantees.
   """
+  def __init__(self, num_buckets=None):
+    """
+    :param num_buckets: If set, specifies the maximum random keys that would be
+      generated.
+    """
+    self.num_buckets = num_buckets
+
+    valid_buckets = isinstance(num_buckets, int) and num_buckets > 0
+    if not (num_buckets is None or valid_buckets):
+      raise ValueError(
+          'If `num_buckets` is set, it has to be an '
+          'integer greater than 0, got %s' % num_buckets)
+
   def expand(self, pcoll):
     # type: (pvalue.PValue) -> pvalue.PCollection
+    if self.num_buckets:
+      keyed = pcoll | 'AddRandomKeys' >> Map(
+          lambda t: (random.randint(0, self.num_buckets), t)).with_input_types(

Review comment:
       can we avoid having two paths?
   
   perhaps set `_DEFAULT_NUM_BUCKETS = 1 << 32` and use `randint` for both?

##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -723,12 +723,33 @@ class Reshuffle(PTransform):
 
   Reshuffle is experimental. No backwards compatibility guarantees.
   """
+  def __init__(self, num_buckets=None):
+    """
+    :param num_buckets: If set, specifies the maximum random keys that would be
+      generated.
+    """
+    self.num_buckets = num_buckets
+
+    valid_buckets = isinstance(num_buckets, int) and num_buckets > 0
+    if not (num_buckets is None or valid_buckets):
+      raise ValueError(
+          'If `num_buckets` is set, it has to be an '
+          'integer greater than 0, got %s' % num_buckets)
+
   def expand(self, pcoll):
     # type: (pvalue.PValue) -> pvalue.PCollection
+    if self.num_buckets:
+      keyed = pcoll | 'AddRandomKeys' >> Map(
+          lambda t: (random.randint(0, self.num_buckets), t)).with_input_types(

Review comment:
       other than that, LGTM




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org