You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/16 01:11:22 UTC

[GitHub] [beam] chamikaramj opened a new pull request #17101: PayloadBuilder for ExternalPythonTransform

chamikaramj opened a new pull request #17101:
URL: https://github.com/apache/beam/pull/17101


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r840997369



##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
+    kwargsMap.putAll(kwargs);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments as a Row objects.
+   *
+   * @param kwargs keyword arguments as a {@link Row} objects. An empty Row represents zero keyword
+   *     arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Row kwargs) {
+    if (this.kwargsMap.size() > 0) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    this.providedKwargsRow = kwargs;
+    return this;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetKvargsRow() {
+    if (providedKwargsRow != null) {
+      return providedKwargsRow;
+    } else if (kwargsMap.size() == 0) {
+      return Row.nullRow(Schema.of());
+    } else {
+      Schema schema =
+          generateSchemaFromFieldValues(
+              kwargsMap.values().toArray(), kwargsMap.keySet().toArray(new String[] {}));
+      return Row.withSchema(schema)
+          .addValues(convertComplexTypesToRows(kwargsMap.values().toArray()))
+          .build();
+    }
+  }
+
+  private static boolean isCustomType(java.lang.Class<?> type) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r832449114



##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +40,169 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.ByteBuddy;

Review comment:
       Do you see a downside for this approach ? I would like to use an existing proven utility here (POJO -> schema) instead trying to recursively and manually generating the schema here. We could end up doing an implementation similar to what's already done by POJO -> schema conversion and might be hard to handle all corner cases that are already handled by existing conversion.
   
   If you don't see a downside can we try to get the API with this implementation in and look for an alternate implementation later ? (for example, if the second implementation is more efficient somehow)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #17101: PayloadBuilder for ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r827542787



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PayloadBuilder.java
##########
@@ -0,0 +1,79 @@
+package org.apache.beam.runners.core.construction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.values.Row;
+
+
+// TODO: Move to sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/
+//  when https://github.com/apache/beam/pull/17035 is in.
+// TODO: Add unit tests.
+public class PayloadBuilder {
+
+  private Schema schema;
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private List<Object> args;
+  private Map<String, Object> kwargs;
+
+  private PayloadBuilder(Schema schema) {
+    this.schema = schema;
+    args = new ArrayList<>();
+    kwargs = new HashMap<>();
+  }
+
+  static PayloadBuilder fromSchema(Schema schema) {
+    return new PayloadBuilder(schema);
+  }
+
+  static PayloadBuilder fromType(Class<?> type) {
+    try {
+      return fromSchema(SCHEMA_REGISTRY.getSchema(type));
+    } catch (NoSuchSchemaException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static PayloadBuilder fromJavaPojo(Object pojo) {

Review comment:
       Ideally, one could just do 
   
   ```
     row = PayloadBuilder.fromJavaPojo(pojo)
   ```
   
   rather than
   
   ```
     row = PayloadBuilder.fromJavaPojo(pojo).withArg(pojo.someArg).withArg(pojo.anotherArg).buildArgs()
   ```
   
   (and making sure that `someArg` and `anotherArg` were properly ordered). 
   
   I think perhaps we should structure this in terms of what a Pipeline author should write, e.g.
   
   ```
   PythonTransform.named("apache_beam.some.Transform").withArgs("foo", 3, ...)
   PythonTransform.named("apache_beam.some.Transform")
       .withKeywordArg("stringArg", "value")
       .withKeywordArg("intArg", 100)
       .withKeywordArgs(mapOfStringToObjectInferringTypes)
       .withKeywordArgs(rowOrMaybeEvenRegisteredPojo);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #17101:
URL: https://github.com/apache/beam/pull/17101#issuecomment-1074749809


   Thanks. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #17101: PayloadBuilder for ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r827549894



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PayloadBuilder.java
##########
@@ -0,0 +1,79 @@
+package org.apache.beam.runners.core.construction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.values.Row;
+
+
+// TODO: Move to sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/
+//  when https://github.com/apache/beam/pull/17035 is in.
+// TODO: Add unit tests.
+public class PayloadBuilder {
+
+  private Schema schema;
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private List<Object> args;
+  private Map<String, Object> kwargs;
+
+  private PayloadBuilder(Schema schema) {
+    this.schema = schema;
+    args = new ArrayList<>();
+    kwargs = new HashMap<>();
+  }
+
+  static PayloadBuilder fromSchema(Schema schema) {
+    return new PayloadBuilder(schema);
+  }
+
+  static PayloadBuilder fromType(Class<?> type) {
+    try {
+      return fromSchema(SCHEMA_REGISTRY.getSchema(type));
+    } catch (NoSuchSchemaException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static PayloadBuilder fromJavaPojo(Object pojo) {

Review comment:
       Thanks. Will do updates after https://github.com/apache/beam/pull/17035 is in.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj merged pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
chamikaramj merged pull request #17101:
URL: https://github.com/apache/beam/pull/17101


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r840092488



##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
+    kwargsMap.putAll(kwargs);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments as a Row objects.
+   *
+   * @param kwargs keyword arguments as a {@link Row} objects. An empty Row represents zero keyword
+   *     arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Row kwargs) {
+    if (this.kwargsMap.size() > 0) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    this.providedKwargsRow = kwargs;
+    return this;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetKvargsRow() {
+    if (providedKwargsRow != null) {
+      return providedKwargsRow;
+    } else if (kwargsMap.size() == 0) {
+      return Row.nullRow(Schema.of());
+    } else {
+      Schema schema =
+          generateSchemaFromFieldValues(
+              kwargsMap.values().toArray(), kwargsMap.keySet().toArray(new String[] {}));
+      return Row.withSchema(schema)
+          .addValues(convertComplexTypesToRows(kwargsMap.values().toArray()))
+          .build();
+    }
+  }
+
+  private static boolean isCustomType(java.lang.Class<?> type) {
+    boolean val =
+        !(ClassUtils.isPrimitiveOrWrapper(type)
+            || type == String.class
+            || Row.class.isAssignableFrom(type));
+    return val;
+  }
+
+  // If the custom type has a registered schema, we use that. OTherwise we try to register it using
+  // 'JavaFieldSchema'.
+  private Row convertCustomValue(Object value) {
+    SerializableFunction<Object, Row> toRowFunc;
+    try {
+      toRowFunc =
+          (SerializableFunction<Object, Row>) SCHEMA_REGISTRY.getToRowFunction(value.getClass());
+    } catch (NoSuchSchemaException e) {
+      SCHEMA_REGISTRY.registerSchemaProvider(value.getClass(), new JavaFieldSchema());
+      try {
+        toRowFunc =
+            (SerializableFunction<Object, Row>) SCHEMA_REGISTRY.getToRowFunction(value.getClass());
+      } catch (NoSuchSchemaException e1) {
+        throw new RuntimeException(e1);
+      }
+    }
+    return toRowFunc.apply(value);
+  }
+
+  private Object[] convertComplexTypesToRows(Object[] values) {
+    Object[] converted = new Object[values.length];
+    for (int i = 0; i < values.length; i++) {
+      Object value = values[i];
+      converted[i] = isCustomType(value.getClass()) ? convertCustomValue(value) : value;
+    }
+    return converted;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetArgsRow() {
+    if (argsArray.length == 0) {

Review comment:
       Ideally generateSchemaFromFieldValues should handle the degenerate empty case just fine, so we don't need to add a special case. (Similarly above for the kwargs.)

##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
+    kwargsMap.putAll(kwargs);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments as a Row objects.
+   *
+   * @param kwargs keyword arguments as a {@link Row} objects. An empty Row represents zero keyword
+   *     arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Row kwargs) {
+    if (this.kwargsMap.size() > 0) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    this.providedKwargsRow = kwargs;
+    return this;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetKvargsRow() {
+    if (providedKwargsRow != null) {
+      return providedKwargsRow;
+    } else if (kwargsMap.size() == 0) {
+      return Row.nullRow(Schema.of());
+    } else {
+      Schema schema =
+          generateSchemaFromFieldValues(
+              kwargsMap.values().toArray(), kwargsMap.keySet().toArray(new String[] {}));
+      return Row.withSchema(schema)
+          .addValues(convertComplexTypesToRows(kwargsMap.values().toArray()))
+          .build();
+    }
+  }
+
+  private static boolean isCustomType(java.lang.Class<?> type) {

Review comment:
       I'm not sure what is meant by "custom" here. 

##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
+    kwargsMap.putAll(kwargs);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments as a Row objects.
+   *
+   * @param kwargs keyword arguments as a {@link Row} objects. An empty Row represents zero keyword
+   *     arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Row kwargs) {
+    if (this.kwargsMap.size() > 0) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    this.providedKwargsRow = kwargs;
+    return this;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetKvargsRow() {

Review comment:
       kvargs -> kwargs (v -> w)

##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
+    kwargsMap.putAll(kwargs);

Review comment:
       Should we check that `providedKwargsRow != null` here too? 

##########
File path: sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java
##########
@@ -27,23 +30,218 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class ExternalPythonTransformTest implements Serializable {
+
   @Test
   public void trivialPythonTransform() {
     Pipeline p = Pipeline.create();
     PCollection<String> output =
         p.apply(Create.of(KV.of("A", "x"), KV.of("A", "y"), KV.of("B", "z")))
             .apply(
-                new ExternalPythonTransform<
-                    PCollection<KV<String, String>>, PCollection<KV<String, Iterable<String>>>>(
-                    "apache_beam.GroupByKey", Row.nullRow(Schema.of()), Row.nullRow(Schema.of())))
+                ExternalPythonTransform
+                    .<PCollection<KV<String, String>>, PCollection<KV<String, Iterable<String>>>>
+                        from("apache_beam.GroupByKey"))
             .apply(MapElements.into(TypeDescriptors.strings()).via(kv -> kv.getKey()));
     PAssert.that(output).containsInAnyOrder("A", "B");
     // TODO: Run this on a multi-language supporting runner.
   }
+
+  @Test
+  public void generateArgsWithPrimitives() {
+    ExternalPythonTransform<?, ?> transform =
+        ExternalPythonTransform
+            .<PCollection<KV<String, String>>, PCollection<KV<String, Iterable<String>>>>from(
+                "DummyTransform")
+            .withArgs("aaa", "bbb", 11, 12L, 15.6, true);
+
+    Schema expectedSchema =
+        Schema.builder()
+            .addStringField("field0")
+            .addStringField("field1")
+            .addInt32Field("field2")
+            .addInt64Field("field3")
+            .addDoubleField("field4")
+            .addBooleanField("field5")
+            .build();
+    Row expectedRow =
+        Row.withSchema(expectedSchema).addValues("aaa", "bbb", 11, 12L, 15.6, true).build();
+
+    Row receivedRow = transform.buildOrGetArgsRow();
+    assertEquals(expectedRow, receivedRow);
+  }
+
+  @Test
+  public void generateArgsWithRow() {
+    Schema subRowSchema1 =
+        Schema.builder().addStringField("field0").addInt32Field("field1").build();
+    Row rowField1 = Row.withSchema(subRowSchema1).addValues("xxx", 123).build();
+    Schema subRowSchema2 =
+        Schema.builder()
+            .addDoubleField("field0")
+            .addBooleanField("field1")
+            .addStringField("field2")
+            .build();
+    Row rowField2 = Row.withSchema(subRowSchema2).addValues(12.5, true, "yyy").build();
+
+    ExternalPythonTransform<?, ?> transform =
+        ExternalPythonTransform
+            .<PCollection<KV<String, String>>, PCollection<KV<String, Iterable<String>>>>from(
+                "DummyTransform")
+            .withArgs(rowField1, rowField2);
+
+    Schema expectedSchema =
+        Schema.builder()
+            .addRowField("field0", subRowSchema1)
+            .addRowField("field1", subRowSchema2)
+            .build();
+    Row expectedRow = Row.withSchema(expectedSchema).addValues(rowField1, rowField2).build();
+
+    Row receivedRow = transform.buildOrGetArgsRow();
+    assertEquals(expectedRow, receivedRow);
+  }
+
+  static class CustomType {
+

Review comment:
       Remove newline (unless spotless insists). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r832435659



##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +40,169 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.ByteBuddy;

Review comment:
       Using ByteBuddy to create a class dynamically from which we can infer the schema seems overkill if we have a set of (type, name)s. Could we use StaticSchemaInference.fieldFromType directly instead? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #17101:
URL: https://github.com/apache/beam/pull/17101#issuecomment-1082214581


   (blocked by https://issues.apache.org/jira/browse/BEAM-14148)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #17101:
URL: https://github.com/apache/beam/pull/17101#issuecomment-1082213860


   Thanks. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r837766566



##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +40,169 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.ByteBuddy;

Review comment:
       Updated to use provided field types directly instead of using ByteBuddy.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r840995481



##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
+    kwargsMap.putAll(kwargs);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments as a Row objects.
+   *
+   * @param kwargs keyword arguments as a {@link Row} objects. An empty Row represents zero keyword
+   *     arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Row kwargs) {
+    if (this.kwargsMap.size() > 0) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    this.providedKwargsRow = kwargs;
+    return this;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetKvargsRow() {
+    if (providedKwargsRow != null) {
+      return providedKwargsRow;
+    } else if (kwargsMap.size() == 0) {
+      return Row.nullRow(Schema.of());
+    } else {
+      Schema schema =
+          generateSchemaFromFieldValues(
+              kwargsMap.values().toArray(), kwargsMap.keySet().toArray(new String[] {}));
+      return Row.withSchema(schema)
+          .addValues(convertComplexTypesToRows(kwargsMap.values().toArray()))
+          .build();
+    }
+  }
+
+  private static boolean isCustomType(java.lang.Class<?> type) {

Review comment:
       Added a comment.

##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
+    kwargsMap.putAll(kwargs);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments as a Row objects.
+   *
+   * @param kwargs keyword arguments as a {@link Row} objects. An empty Row represents zero keyword
+   *     arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Row kwargs) {
+    if (this.kwargsMap.size() > 0) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    this.providedKwargsRow = kwargs;
+    return this;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetKvargsRow() {
+    if (providedKwargsRow != null) {
+      return providedKwargsRow;
+    } else if (kwargsMap.size() == 0) {
+      return Row.nullRow(Schema.of());
+    } else {
+      Schema schema =
+          generateSchemaFromFieldValues(
+              kwargsMap.values().toArray(), kwargsMap.keySet().toArray(new String[] {}));
+      return Row.withSchema(schema)
+          .addValues(convertComplexTypesToRows(kwargsMap.values().toArray()))
+          .build();
+    }
+  }
+
+  private static boolean isCustomType(java.lang.Class<?> type) {
+    boolean val =
+        !(ClassUtils.isPrimitiveOrWrapper(type)
+            || type == String.class
+            || Row.class.isAssignableFrom(type));
+    return val;
+  }
+
+  // If the custom type has a registered schema, we use that. OTherwise we try to register it using
+  // 'JavaFieldSchema'.
+  private Row convertCustomValue(Object value) {
+    SerializableFunction<Object, Row> toRowFunc;
+    try {
+      toRowFunc =
+          (SerializableFunction<Object, Row>) SCHEMA_REGISTRY.getToRowFunction(value.getClass());
+    } catch (NoSuchSchemaException e) {
+      SCHEMA_REGISTRY.registerSchemaProvider(value.getClass(), new JavaFieldSchema());
+      try {
+        toRowFunc =
+            (SerializableFunction<Object, Row>) SCHEMA_REGISTRY.getToRowFunction(value.getClass());
+      } catch (NoSuchSchemaException e1) {
+        throw new RuntimeException(e1);
+      }
+    }
+    return toRowFunc.apply(value);
+  }
+
+  private Object[] convertComplexTypesToRows(Object[] values) {
+    Object[] converted = new Object[values.length];
+    for (int i = 0; i < values.length; i++) {
+      Object value = values[i];
+      converted[i] = isCustomType(value.getClass()) ? convertCustomValue(value) : value;
+    }
+    return converted;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetArgsRow() {
+    if (argsArray.length == 0) {

Review comment:
       Updated and added tests.

##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
+    kwargsMap.putAll(kwargs);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments as a Row objects.
+   *
+   * @param kwargs keyword arguments as a {@link Row} objects. An empty Row represents zero keyword
+   *     arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Row kwargs) {
+    if (this.kwargsMap.size() > 0) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    this.providedKwargsRow = kwargs;
+    return this;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetKvargsRow() {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #17101: PayloadBuilder for ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r827543263



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PayloadBuilder.java
##########
@@ -0,0 +1,79 @@
+package org.apache.beam.runners.core.construction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.values.Row;
+
+
+// TODO: Move to sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/
+//  when https://github.com/apache/beam/pull/17035 is in.
+// TODO: Add unit tests.
+public class PayloadBuilder {
+
+  private Schema schema;
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private List<Object> args;
+  private Map<String, Object> kwargs;
+
+  private PayloadBuilder(Schema schema) {
+    this.schema = schema;
+    args = new ArrayList<>();
+    kwargs = new HashMap<>();
+  }
+
+  static PayloadBuilder fromSchema(Schema schema) {
+    return new PayloadBuilder(schema);
+  }
+
+  static PayloadBuilder fromType(Class<?> type) {
+    try {
+      return fromSchema(SCHEMA_REGISTRY.getSchema(type));
+    } catch (NoSuchSchemaException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static PayloadBuilder fromJavaPojo(Object pojo) {

Review comment:
       This really boils down to what the user-friendly API for external transforms should be for Java (Python transforms or no). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #17101:
URL: https://github.com/apache/beam/pull/17101#issuecomment-1074765847


   cc: @ihji 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #17101: [BEAM-14143] Simplifies the ExternalPythonTransform API

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r832609421



##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +40,169 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy.ByteBuddy;

Review comment:
       The downside is that it's way more complicated than need be, and likely more brittle too (using bytebuddy, invoking class loaders, etc.) It also accumulates garbage in global state (e.g. the classloader, schema registry). The POJO -> schema conversions are complicated because they handle all kinds of variants of how fields are named, annotations, private members vs autovalues vs getter/setters, etc. which are not relevant here. 
   
   What about
   
   ```
     Schema schemaFromMap(Map<String, Object> fields) {
       Schema.Builder builder = Schema.builder();
       for (Map.Entry<String, Object> entry : fields.entrySet()) {
         Schema.builder().addField(
             entry.getKey(),
             StaticSchemaInference.fieldFromType(
                 TypeDescriptor.of(entry.getValue().getClass()),
                 JavaFieldSchema.JavaFieldTypeSupplier.INSTANCE));
       }
       return builder.build();
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org