You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/07 23:25:45 UTC

[GitHub] [beam] udim commented on a change in pull request #13493: [BEAM-10475] Add output type hints for GroupIntoBatches.

udim commented on a change in pull request #13493:
URL: https://github.com/apache/beam/pull/13493#discussion_r537894802



##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -72,6 +72,9 @@
 from apache_beam.transforms.window import NonMergingWindowFn
 from apache_beam.transforms.window import TimestampCombiner
 from apache_beam.transforms.window import TimestampedValue
+from apache_beam.typehints.sharded_key_type import ShardedKeyType
+from apache_beam.typehints.typehints import IterableTypeConstraint
+from apache_beam.typehints.typehints import TupleConstraint

Review comment:
       Please use:
   ```
   from apache_beam.typehints import ShardedKeyType
   ```
   And use `Tuple` and `Iterable` types from the typing module (they will get converted to the ones in typehints). In general use either `typing.Tuple` or `apache_beam.typehints.Tuple`. The constraint classes are internal to the typehints modules.
   
   For the `ShardedKeyType` import to work, you can add the corresponding import statement to `apache_beam/typehints/__init__.py`. This also avoid the cyclic import you mentioned that happened with my suggestion to put ShardedKeyType in typehints.py.

##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -785,7 +788,6 @@ def expand(self, pcoll):
 
   @experimental()
   @typehints.with_input_types(Tuple[K, V])
-  @typehints.with_output_types(Tuple[K, Iterable[V]])

Review comment:
       Any reason why this isn't:
   ```python
     @typehints.with_output_types(Tuple[ShardedKeyType[K], Iterable[V]])
   ```
   ?
   

##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -815,18 +817,25 @@ def __init__(self, batch_size, max_buffering_duration_secs=None):
     _shard_id_prefix = uuid.uuid4().bytes
 
     def expand(self, pcoll):
+      key_type, value_type = pcoll.element_type.tuple_types
       sharded_pcoll = pcoll | Map(
           lambda key_value: (
               ShardedKey(
                   key_value[0],
                   # Use [uuid, thread id] as the shard id.
                   GroupIntoBatches.WithShardedKey._shard_id_prefix + bytes(
                       threading.get_ident().to_bytes(8, 'big'))),
-              key_value[1]))
+              key_value[1])).with_output_types(
+                  TupleConstraint([ShardedKeyType[key_type], value_type]))

Review comment:
       If you keep this line, I believe the value_type should be `Iterable[value_type]`.

##########
File path: sdks/python/apache_beam/typehints/sharded_key_type.py
##########
@@ -25,8 +25,12 @@
 from apache_beam.typehints.typehints import match_type_variables
 from apache_beam.utils.sharded_key import ShardedKey
 
+from future.utils import with_metaclass
 
-class ShardedKeyTypeConstraint(typehints.TypeConstraint):
+
+class ShardedKeyTypeConstraint(with_metaclass(typehints.GetitemConstructor,

Review comment:
       I don't know what `with_metaclass` does. You could add the `__getitem__` implementation directly if it's causing issues.
   

##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -815,18 +817,25 @@ def __init__(self, batch_size, max_buffering_duration_secs=None):
     _shard_id_prefix = uuid.uuid4().bytes
 
     def expand(self, pcoll):
+      key_type, value_type = pcoll.element_type.tuple_types
       sharded_pcoll = pcoll | Map(
           lambda key_value: (
               ShardedKey(
                   key_value[0],
                   # Use [uuid, thread id] as the shard id.
                   GroupIntoBatches.WithShardedKey._shard_id_prefix + bytes(
                       threading.get_ident().to_bytes(8, 'big'))),
-              key_value[1]))
+              key_value[1])).with_output_types(
+                  TupleConstraint([ShardedKeyType[key_type], value_type]))
       return (
           sharded_pcoll
           | GroupIntoBatches(self.batch_size, self.max_buffering_duration_secs))
 
+    def infer_output_type(self, input_type):

Review comment:
       If you keep the with_output_types decorator, please remove this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org