You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/09/15 02:36:03 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #15484: [BEAM-12769] Python support for directly using Java transforms using constructor and builder methods

chamikaramj commented on a change in pull request #15484:
URL: https://github.com/apache/beam/pull/15484#discussion_r708785353



##########
File path: sdks/python/apache_beam/transforms/external.py
##########
@@ -144,6 +147,101 @@ def _get_named_tuple_instance(self):
     return self._tuple_instance
 
 
+class JavaClassLookupPayloadBuilder(PayloadBuilder):
+  """
+  Builds a payload for directly instantiating a Java transform using a
+  constructor and builder methods.
+  """
+  def __init__(self, class_name):
+    """
+    :param class_name: fully qualified name of the transform class.
+    """
+    if not class_name:
+      raise ValueError('Class name must not be empty')
+
+    self._class_name = class_name
+    self._constructor_method = None
+    self._constructor_params = None
+    self._builder_methods_and_params = OrderedDict()
+
+  def _get_schema_proto_and_payload(self, **kwargs):
+    named_fields = []
+    for key, value in kwargs.items():
+      if not key:
+        raise ValueError('Parameter name cannot be empty')
+      if value is None:
+        raise ValueError(
+            'Received value None for key %s. None values are currently not '
+            'supported' % key)
+      named_fields.append(
+          (key, convert_to_typing_type(instance_to_type(value))))
+
+    schema_proto = named_fields_to_schema(named_fields)
+    row = named_tuple_from_schema(schema_proto)(**kwargs)
+    schema = named_tuple_to_schema(type(row))
+
+    payload = RowCoder(schema).encode(row)
+    return (schema_proto, payload)
+
+  def build(self):
+    constructor_params = self._constructor_params or {}
+    constructor_schema, constructor_payload = (
+        self._get_schema_proto_and_payload(**constructor_params))
+    payload = JavaClassLookupPayload(
+        class_name=self._class_name,
+        constructor_schema=constructor_schema,
+        constructor_payload=constructor_payload)
+    if self._constructor_method:
+      payload.constructor_method = self._constructor_method
+
+    for builder_method_name, params in self._builder_methods_and_params.items():
+      builder_method_schema, builder_method_payload = (
+          self._get_schema_proto_and_payload(**params))
+      builder_method = BuilderMethod(
+          name=builder_method_name,
+          schema=builder_method_schema,
+          payload=builder_method_payload)
+      builder_method.name = builder_method_name
+      payload.builder_methods.append(builder_method)
+    return payload
+
+  def add_constructor(self, **kwargs):

Review comment:
       Done. We already fail for more than one non-trivial invocation of with_constructor or with_constructor_method.

##########
File path: sdks/python/apache_beam/transforms/external.py
##########
@@ -144,6 +147,101 @@ def _get_named_tuple_instance(self):
     return self._tuple_instance
 
 
+class JavaClassLookupPayloadBuilder(PayloadBuilder):
+  """
+  Builds a payload for directly instantiating a Java transform using a
+  constructor and builder methods.
+  """
+  def __init__(self, class_name):
+    """
+    :param class_name: fully qualified name of the transform class.
+    """
+    if not class_name:
+      raise ValueError('Class name must not be empty')
+
+    self._class_name = class_name
+    self._constructor_method = None
+    self._constructor_params = None
+    self._builder_methods_and_params = OrderedDict()
+
+  def _get_schema_proto_and_payload(self, **kwargs):
+    named_fields = []
+    for key, value in kwargs.items():
+      if not key:
+        raise ValueError('Parameter name cannot be empty')
+      if value is None:
+        raise ValueError(
+            'Received value None for key %s. None values are currently not '
+            'supported' % key)
+      named_fields.append(
+          (key, convert_to_typing_type(instance_to_type(value))))
+
+    schema_proto = named_fields_to_schema(named_fields)
+    row = named_tuple_from_schema(schema_proto)(**kwargs)
+    schema = named_tuple_to_schema(type(row))
+
+    payload = RowCoder(schema).encode(row)
+    return (schema_proto, payload)
+
+  def build(self):
+    constructor_params = self._constructor_params or {}
+    constructor_schema, constructor_payload = (
+        self._get_schema_proto_and_payload(**constructor_params))
+    payload = JavaClassLookupPayload(
+        class_name=self._class_name,
+        constructor_schema=constructor_schema,
+        constructor_payload=constructor_payload)
+    if self._constructor_method:
+      payload.constructor_method = self._constructor_method
+
+    for builder_method_name, params in self._builder_methods_and_params.items():
+      builder_method_schema, builder_method_payload = (
+          self._get_schema_proto_and_payload(**params))
+      builder_method = BuilderMethod(
+          name=builder_method_name,
+          schema=builder_method_schema,
+          payload=builder_method_payload)
+      builder_method.name = builder_method_name
+      payload.builder_methods.append(builder_method)
+    return payload
+
+  def add_constructor(self, **kwargs):
+    """
+    Specifies the Java constructor to use.
+
+    :param kwargs: parameter names and values of the constructor.
+    """
+    if self._constructor_method or self._constructor_params:
+      raise ValueError(
+          'Constructor or constructor method can only be specified once')
+
+    self._constructor_params = kwargs
+
+  def add_constructor_method(self, method_name, **kwargs):
+    """
+    Specifies the Java constructor method to use.
+
+    :param method_name: name of the constructor method.
+    :param kwargs: parameter names and values of the constructor method.
+    """
+    if self._constructor_method or self._constructor_params:
+      raise ValueError(
+          'Constructor or constructor method can only be specified once')
+
+    self._constructor_method = method_name
+    self._constructor_params = kwargs
+
+  def add_builder_method(self, method_name, **kwargs):

Review comment:
       I think there are multiple issues with supporting that.
   
   (1) Java args have to be ordered but Python requires args to be specified before kwargs. So we'll somehow have to preserve the order between args and kwargs or accept only one. That will probably end up being confusing to users.
   (2) Schema requires field names to be specified. So we'll have to define a special set of field names that get ignored by Java side.
   
   Also, Java parameter names should be already known (for example, available in Java docs) and it should not be too difficult for Python users to specify these.
   
   WDYT ?

##########
File path: sdks/python/apache_beam/portability/common_urns.py
##########
@@ -66,3 +67,5 @@
 requirements = StandardRequirements.Enum
 
 displayData = StandardDisplayData.DisplayData
+
+java_class_lookup = ExpansionMethods.JAVA_CLASS_LOOKUP

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org