You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/04/02 02:43:00 UTC

[jira] [Work logged] (BEAM-14143) ExternalPythonTransform API for easily invoking Python transforms from Java

     [ https://issues.apache.org/jira/browse/BEAM-14143?focusedWorklogId=751871&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-751871 ]

ASF GitHub Bot logged work on BEAM-14143:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Apr/22 02:42
            Start Date: 02/Apr/22 02:42
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r840995481



##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
+    kwargsMap.putAll(kwargs);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments as a Row objects.
+   *
+   * @param kwargs keyword arguments as a {@link Row} objects. An empty Row represents zero keyword
+   *     arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Row kwargs) {
+    if (this.kwargsMap.size() > 0) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    this.providedKwargsRow = kwargs;
+    return this;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetKvargsRow() {
+    if (providedKwargsRow != null) {
+      return providedKwargsRow;
+    } else if (kwargsMap.size() == 0) {
+      return Row.nullRow(Schema.of());
+    } else {
+      Schema schema =
+          generateSchemaFromFieldValues(
+              kwargsMap.values().toArray(), kwargsMap.keySet().toArray(new String[] {}));
+      return Row.withSchema(schema)
+          .addValues(convertComplexTypesToRows(kwargsMap.values().toArray()))
+          .build();
+    }
+  }
+
+  private static boolean isCustomType(java.lang.Class<?> type) {

Review comment:
       Added a comment.

##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
+    kwargsMap.putAll(kwargs);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments as a Row objects.
+   *
+   * @param kwargs keyword arguments as a {@link Row} objects. An empty Row represents zero keyword
+   *     arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Row kwargs) {
+    if (this.kwargsMap.size() > 0) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    this.providedKwargsRow = kwargs;
+    return this;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetKvargsRow() {
+    if (providedKwargsRow != null) {
+      return providedKwargsRow;
+    } else if (kwargsMap.size() == 0) {
+      return Row.nullRow(Schema.of());
+    } else {
+      Schema schema =
+          generateSchemaFromFieldValues(
+              kwargsMap.values().toArray(), kwargsMap.keySet().toArray(new String[] {}));
+      return Row.withSchema(schema)
+          .addValues(convertComplexTypesToRows(kwargsMap.values().toArray()))
+          .build();
+    }
+  }
+
+  private static boolean isCustomType(java.lang.Class<?> type) {
+    boolean val =
+        !(ClassUtils.isPrimitiveOrWrapper(type)
+            || type == String.class
+            || Row.class.isAssignableFrom(type));
+    return val;
+  }
+
+  // If the custom type has a registered schema, we use that. OTherwise we try to register it using
+  // 'JavaFieldSchema'.
+  private Row convertCustomValue(Object value) {
+    SerializableFunction<Object, Row> toRowFunc;
+    try {
+      toRowFunc =
+          (SerializableFunction<Object, Row>) SCHEMA_REGISTRY.getToRowFunction(value.getClass());
+    } catch (NoSuchSchemaException e) {
+      SCHEMA_REGISTRY.registerSchemaProvider(value.getClass(), new JavaFieldSchema());
+      try {
+        toRowFunc =
+            (SerializableFunction<Object, Row>) SCHEMA_REGISTRY.getToRowFunction(value.getClass());
+      } catch (NoSuchSchemaException e1) {
+        throw new RuntimeException(e1);
+      }
+    }
+    return toRowFunc.apply(value);
+  }
+
+  private Object[] convertComplexTypesToRows(Object[] values) {
+    Object[] converted = new Object[values.length];
+    for (int i = 0; i < values.length; i++) {
+      Object value = values[i];
+      converted[i] = isCustomType(value.getClass()) ? convertCustomValue(value) : value;
+    }
+    return converted;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetArgsRow() {
+    if (argsArray.length == 0) {

Review comment:
       Updated and added tests.

##########
File path: sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, Object> kwargs) {
+    kwargsMap.putAll(kwargs);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments as a Row objects.
+   *
+   * @param kwargs keyword arguments as a {@link Row} objects. An empty Row represents zero keyword
+   *     arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Row kwargs) {
+    if (this.kwargsMap.size() > 0) {
+      throw new IllegalArgumentException("Kwargs were specified both directly and as a Row object");
+    }
+    this.providedKwargsRow = kwargs;
+    return this;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetKvargsRow() {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 751871)
    Time Spent: 1h 40m  (was: 1.5h)

> ExternalPythonTransform API for easily invoking Python transforms from Java
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-14143
>                 URL: https://issues.apache.org/jira/browse/BEAM-14143
>             Project: Beam
>          Issue Type: New Feature
>          Components: cross-language
>            Reporter: Chamikara Madhusanka Jayalath
>            Priority: P2
>             Fix For: 2.39.0
>
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)