You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/29 22:11:21 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #15343: [BEAM-12769] Adds support for expanding a Java cross-language transform using the class name and builder methods

chamikaramj commented on a change in pull request #15343:
URL: https://github.com/apache/beam/pull/15343#discussion_r698043911



##########
File path: sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceOptions.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.expansion.service;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+public interface ExpansionServiceOptions extends PipelineOptions {
+  @Description("Allow list file for Java class based transform expansion")
+  @Default.String("")
+  String getJavaClassLookupAllowlist();

Review comment:
       Done. I left the example allow-list file and it's usage in tests for now (through the second pipeline option) instead of injecting an options object. I think that serves as a good example for now.

##########
File path: sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.expansion.service;
+
+import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.BuilderMethod;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.JavaClassLookupPayload;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.Parameter;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.PayloadTypeUrns;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.ClassUtils;
+import org.apache.beam.sdk.expansion.service.ExpansionService.ExternalTransformRegistrarLoader;
+import org.apache.beam.sdk.expansion.service.ExpansionService.TransformProvider;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A transform provider that can be used to directly instantiate a transform using Java class name
+ * and builder methods.
+ *
+ * @param <InputT> input {@link PInput} type of the transform
+ * @param <OutputT> output {@link POutput} type of the transform
+ */
+@SuppressWarnings({"rawtypes", "argument.type.incompatible", "assignment.type.incompatible"})
+@SuppressFBWarnings("UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD")
+class JavaClassLookupTransformProvider<InputT extends PInput, OutputT extends POutput>
+    implements TransformProvider<PInput, POutput> {
+
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  @Nullable AllowList allowList;
+
+  public JavaClassLookupTransformProvider(String allowListFile) {
+    if (!allowListFile.isEmpty()) {
+      ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+      File allowListFileObj = new File(allowListFile);
+      if (!allowListFileObj.exists()) {
+        throw new IllegalArgumentException("Allow list file " + allowListFile + " does not exist");
+      }
+      try {
+        allowList = mapper.readValue(allowListFileObj, AllowList.class);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(
+            "Could not load the provided allowlist file " + allowListFile, e);
+      }
+    }
+  }
+
+  @Override
+  public PTransform<PInput, POutput> getTransform(FunctionSpec spec) {
+    JavaClassLookupPayload payload = null;
+    try {
+      payload = JavaClassLookupPayload.parseFrom(spec.getPayload());
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalArgumentException(
+          "Invalid payload type for URN " + getUrn(PayloadTypeUrns.Enum.JAVA_CLASS_LOOKUP), e);
+    }
+
+    String className = payload.getClassName();
+    try {
+      AllowedClass allowlistClass = null;
+      if (this.allowList != null) {
+        for (AllowedClass cls : this.allowList.getAllowedClasses()) {
+          if (cls.getClassName().equals(className)) {
+            if (allowlistClass != null) {
+              throw new IllegalArgumentException(
+                  "Found two matching allowlist classes " + allowlistClass + " and " + cls);
+            }
+            allowlistClass = cls;
+          }
+        }
+      }
+      if (allowlistClass == null) {
+        throw new UnsupportedOperationException(
+            "Expanding a transform class by the name " + className + " is not allowed.");
+      }
+      Class transformClass = Class.forName(className);
+      PTransform<PInput, POutput> transform;
+      if (payload.getConstructorMethod().isEmpty()) {
+        Constructor[] constructors = transformClass.getConstructors();
+        Constructor constructor = findMappingConstructor(constructors, payload);
+        Object[] parameterValues =
+            getParameterValues(
+                constructor.getParameters(),
+                payload.getConstructorParametersList().toArray(new Parameter[0]));
+        transform = (PTransform<PInput, POutput>) constructor.newInstance(parameterValues);
+      } else {
+        Method[] methods = transformClass.getMethods();
+        Method method = findMappingConstructorMethod(methods, payload, allowlistClass);
+        Object[] parameterValues =
+            getParameterValues(
+                method.getParameters(),
+                payload.getConstructorParametersList().toArray(new Parameter[0]));
+        transform = (PTransform<PInput, POutput>) method.invoke(null /* static */, parameterValues);
+      }
+      return applyBuilderMethods(transform, payload, allowlistClass);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException("Could not find class " + className, e);
+    } catch (InstantiationException
+        | IllegalArgumentException
+        | IllegalAccessException
+        | InvocationTargetException e) {
+      throw new IllegalArgumentException("Could not instantiate class " + className, e);
+    }
+  }
+
+  private PTransform<PInput, POutput> applyBuilderMethods(
+      PTransform<PInput, POutput> transform,
+      JavaClassLookupPayload payload,
+      AllowedClass allowListClass) {
+    for (BuilderMethod builderMethod : payload.getBuilderMethodsList()) {
+      Method method = getMethod(transform, builderMethod, allowListClass);
+      try {
+        transform =
+            (PTransform<PInput, POutput>)
+                method.invoke(
+                    transform,
+                    getParameterValues(
+                        method.getParameters(),
+                        builderMethod.getParameterList().toArray(new Parameter[0])));
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        throw new IllegalArgumentException(
+            "Could not invoke the builder method "
+                + builderMethod
+                + " on transform "
+                + transform
+                + " with parameters "
+                + builderMethod.getParameterList(),
+            e);
+      }
+    }
+
+    return transform;
+  }
+
+  private boolean isBuilderMethodForName(
+      Method method, String nameFromPayload, AllowedClass allowListClass) {
+    // Lookup based on method annotations
+    for (Annotation annotation : method.getAnnotations()) {
+      if (annotation instanceof MultiLanguageBuilderMethod) {
+        if (nameFromPayload.equals(((MultiLanguageBuilderMethod) annotation).name())) {
+          if (allowListClass.getAllowedBuilderMethods().contains(nameFromPayload)) {
+            return true;
+          } else {
+            throw new RuntimeException(
+                "Builder method " + nameFromPayload + " has to be explicitly allowed");
+          }
+        }
+      }
+    }
+
+    // Lookup based on the method name.
+    boolean match = method.getName().equals(nameFromPayload);
+    String consideredMethodName = method.getName();
+
+    // We provide a simplification for common Java builder pattern naming convention where builder
+    // methods start with "with". In this case, for a builder method name in the form "withXyz",
+    // users may just use "xyz". If additional updates to the method name are needed the transform
+    // has to be updated by adding annotations.
+    if (!match && consideredMethodName.length() > 4 && consideredMethodName.startsWith("with")) {
+      consideredMethodName =
+          consideredMethodName.substring(4, 5).toLowerCase() + consideredMethodName.substring(5);
+      match = consideredMethodName.equals(nameFromPayload);
+    }
+    if (match && !allowListClass.getAllowedBuilderMethods().contains(consideredMethodName)) {
+      throw new RuntimeException(
+          "Builder method name " + consideredMethodName + " has to be explicitly allowed");
+    }
+    return match;
+  }
+
+  private Method getMethod(
+      PTransform<PInput, POutput> transform,
+      BuilderMethod builderMethod,
+      AllowedClass allowListClass) {
+    List<Method> matchingMethods =
+        Arrays.stream(transform.getClass().getMethods())
+            .filter(m -> isBuilderMethodForName(m, builderMethod.getName(), allowListClass))
+            .filter(
+                m ->
+                    parametersCompatible(
+                        m.getParameters(),
+                        builderMethod.getParameterList().toArray(new Parameter[0])))
+            .filter(m -> PTransform.class.isAssignableFrom(m.getReturnType()))
+            .collect(Collectors.toList());
+
+    if (matchingMethods.size() != 1) {
+      throw new RuntimeException(
+          "Expected to find exact one matching method in transform "
+              + transform
+              + " for BuilderMethod"
+              + builderMethod
+              + " but found "
+              + matchingMethods.size());
+    }
+    return matchingMethods.get(0);
+  }
+
+  private static boolean isPrimitiveOrWrapperOrString(java.lang.Class<?> type) {
+    return ClassUtils.isPrimitiveOrWrapper(type) || type == String.class;
+  }
+
+  private boolean parametersCompatible(
+      java.lang.reflect.Parameter[] methodParameters, Parameter[] payloadParameters) {
+    if (methodParameters.length != payloadParameters.length) {
+      return false;
+    }
+
+    for (int i = 0; i < methodParameters.length; i++) {
+      java.lang.reflect.Parameter parameterFromReflection = methodParameters[i];
+      Parameter parameterFromPayload = payloadParameters[i];
+
+      String paramNameFromReflection = parameterFromReflection.getName();
+      if (!paramNameFromReflection.startsWith("arg")
+          && !paramNameFromReflection.equals(parameterFromPayload.getName())) {
+        // Parameter name through reflection is from the class file (not through synthesizing,
+        // hence we can validate names)
+        return false;
+      }
+
+      Class parameterClass = parameterFromReflection.getType();
+      Row parameterRow =
+          ExternalTransformRegistrarLoader.decodeRow(
+              parameterFromPayload.getSchema(), parameterFromPayload.getPayload());
+
+      Schema parameterSchema = null;
+      if (isPrimitiveOrWrapperOrString(parameterClass)) {
+        if (parameterRow.getFieldCount() != 1) {
+          throw new RuntimeException(
+              "Expected a row for a single primitive field but received " + parameterRow);
+        }
+        // We get the value just for validation here.
+        getPrimitiveValueFromRow(parameterRow);
+      } else {
+        try {
+          parameterSchema = SCHEMA_REGISTRY.getSchema(parameterClass);
+        } catch (NoSuchSchemaException e) {
+
+          SCHEMA_REGISTRY.registerSchemaProvider(parameterClass, new JavaFieldSchema());
+          try {
+            parameterSchema = SCHEMA_REGISTRY.getSchema(parameterClass);
+          } catch (NoSuchSchemaException e1) {
+            throw new RuntimeException(e1);
+          }
+          if (parameterSchema != null && parameterSchema.getFieldCount() == 0) {
+            throw new RuntimeException(
+                "Could not determine a valid schema for parameter class " + parameterClass);
+          }
+        }
+      }
+
+      if (parameterSchema != null && !parameterRow.getSchema().assignableTo(parameterSchema)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private Object[] getParameterValues(
+      java.lang.reflect.Parameter[] parameters, Parameter[] payloadParameters) {
+    ArrayList<Object> parameterValues = new ArrayList<>();
+    int i = 0;
+    for (java.lang.reflect.Parameter parameter : parameters) {
+      Parameter parameterConfig = payloadParameters[i];
+      Class parameterClass = parameter.getType();
+
+      Row parameterRow =
+          ExternalTransformRegistrarLoader.decodeRow(
+              parameterConfig.getSchema(), parameterConfig.getPayload());
+
+      Object parameterValue = null;
+      if (isPrimitiveOrWrapperOrString(parameterClass)) {
+        parameterValue = getPrimitiveValueFromRow(parameterRow);
+      } else {
+        SerializableFunction<Row, Object> fromRowFunc = null;
+        // SCHEMA_REGISTRY.
+        try {
+          fromRowFunc = SCHEMA_REGISTRY.getFromRowFunction(parameterClass);
+        } catch (NoSuchSchemaException e) {
+          throw new IllegalArgumentException(
+              "Could not determine the row function for class " + parameterClass, e);
+        }
+        parameterValue = fromRowFunc.apply(parameterRow);
+      }
+      parameterValues.add(parameterValue);
+      i++;
+    }
+
+    return parameterValues.toArray();
+  }
+
+  private Object getPrimitiveValueFromRow(Row parameterRow) {
+    if (parameterRow.getFieldCount() != 1) {
+      throw new IllegalArgumentException(
+          "Expected a Row that contains a single field but received " + parameterRow);
+    }
+
+    @NonNull Object value = parameterRow.getValue(0);
+    if (!isPrimitiveOrWrapperOrString(value.getClass())) {
+      throw new IllegalArgumentException("Expected a Java primitive value but received " + value);
+    }
+    return value;
+  }
+
+  private Constructor findMappingConstructor(

Review comment:
       I looked into this a bit. I think we currently cannot use these utilities since we cannot derive the Java type from schema+row. We currently check if two given parameters values are compatible using "Schema.assignableTo()" utility. Trying to invoke different methods/constructors with available values might work but this seems risky and error prone. Lemme know if if you see a better way.

##########
File path: sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProvider.java
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.expansion.service;
+
+import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.BuilderMethod;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.JavaClassLookupPayload;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.Parameter;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.PayloadTypeUrns;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.ClassUtils;
+import org.apache.beam.sdk.expansion.service.ExpansionService.ExternalTransformRegistrarLoader;
+import org.apache.beam.sdk.expansion.service.ExpansionService.TransformProvider;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A transform provider that can be used to directly instantiate a transform using Java class name
+ * and builder methods.
+ *
+ * @param <InputT> input {@link PInput} type of the transform
+ * @param <OutputT> output {@link POutput} type of the transform
+ */
+@SuppressWarnings({"rawtypes", "argument.type.incompatible", "assignment.type.incompatible"})
+@SuppressFBWarnings("UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD")
+class JavaClassLookupTransformProvider<InputT extends PInput, OutputT extends POutput>
+    implements TransformProvider<PInput, POutput> {
+
+  private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
+  @Nullable AllowList allowList;
+
+  public JavaClassLookupTransformProvider(String allowListFile) {
+    if (!allowListFile.isEmpty()) {
+      ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+      File allowListFileObj = new File(allowListFile);
+      if (!allowListFileObj.exists()) {
+        throw new IllegalArgumentException("Allow list file " + allowListFile + " does not exist");
+      }
+      try {
+        allowList = mapper.readValue(allowListFileObj, AllowList.class);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(
+            "Could not load the provided allowlist file " + allowListFile, e);
+      }
+    }
+  }
+
+  @Override
+  public PTransform<PInput, POutput> getTransform(FunctionSpec spec) {
+    JavaClassLookupPayload payload = null;
+    try {
+      payload = JavaClassLookupPayload.parseFrom(spec.getPayload());
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalArgumentException(
+          "Invalid payload type for URN " + getUrn(PayloadTypeUrns.Enum.JAVA_CLASS_LOOKUP), e);
+    }
+
+    String className = payload.getClassName();
+    try {
+      AllowedClass allowlistClass = null;
+      if (this.allowList != null) {
+        for (AllowedClass cls : this.allowList.getAllowedClasses()) {
+          if (cls.getClassName().equals(className)) {
+            if (allowlistClass != null) {
+              throw new IllegalArgumentException(
+                  "Found two matching allowlist classes " + allowlistClass + " and " + cls);
+            }
+            allowlistClass = cls;
+          }
+        }
+      }
+      if (allowlistClass == null) {
+        throw new UnsupportedOperationException(
+            "Expanding a transform class by the name " + className + " is not allowed.");
+      }
+      Class transformClass = Class.forName(className);

Review comment:
       Done.

##########
File path: sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaCLassLookupTransformProviderTest.java
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.expansion.service;
+
+import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URL;
+import org.apache.beam.model.expansion.v1.ExpansionApi;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.BuilderMethod;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.PayloadTypeUrns;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+/** Tests for {@link JavaCLassLookupTransformProvider}. */
+@SuppressWarnings({
+  "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+})
+public class JavaCLassLookupTransformProviderTest {

Review comment:
       Done.

##########
File path: sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -370,11 +374,16 @@ default InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) {
 
   private @MonotonicNonNull Map<String, TransformProvider> registeredTransforms;
   private final PipelineOptions pipelineOptions;
+  private String allowlistFile = "";

Review comment:
       Done.

##########
File path: sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
##########
@@ -370,11 +374,16 @@ default InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) {
 
   private @MonotonicNonNull Map<String, TransformProvider> registeredTransforms;
   private final PipelineOptions pipelineOptions;
+  private String allowlistFile = "";
 
   public ExpansionService() {
     this(new String[] {});
   }
 
+  void setAllowlistFile(String allowlistFile) {

Review comment:
       Done.

##########
File path: sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaCLassLookupTransformProviderTest.java
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.expansion.service;
+
+import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URL;
+import org.apache.beam.model.expansion.v1.ExpansionApi;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.BuilderMethod;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.PayloadTypeUrns;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+/** Tests for {@link JavaCLassLookupTransformProvider}. */
+@SuppressWarnings({
+  "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)

Review comment:
       Done.

##########
File path: sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaCLassLookupTransformProviderTest.java
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.expansion.service;
+
+import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URL;
+import org.apache.beam.model.expansion.v1.ExpansionApi;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.BuilderMethod;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.PayloadTypeUrns;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+/** Tests for {@link JavaCLassLookupTransformProvider}. */
+@SuppressWarnings({
+  "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+})
+public class JavaCLassLookupTransformProviderTest {
+
+  private static final String TEST_URN = "test:beam:transforms:count";
+
+  private static final String TEST_NAME = "TestName";
+
+  private static final String TEST_NAMESPACE = "namespace";
+
+  private ExpansionService expansionService = new ExpansionService();
+
+  public static class DummyTransform extends PTransform<PBegin, PCollection<String>> {
+
+    String strField1;
+    String strField2;
+    int intField1;
+
+    @Override
+    public PCollection<String> expand(PBegin input) {
+      return input
+          .apply(Create.of("aaa", "bbb", "ccc"))
+          .apply(
+              ParDo.of(
+                  new DoFn<String, String>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext c) {
+                      c.output(c.element() + strField1);
+                    }
+                  }));
+    }
+  }
+
+  public static class DummyTransformWithConstructor extends DummyTransform {
+
+    public DummyTransformWithConstructor(String strField1) {
+      this.strField1 = strField1;
+    }
+  }
+
+  public static class DummyTransformWithConstructorAndBuilderMethods extends DummyTransform {
+
+    public DummyTransformWithConstructorAndBuilderMethods(String strField1) {
+      this.strField1 = strField1;
+    }
+
+    public DummyTransformWithConstructorAndBuilderMethods withStrField2(String strField2) {
+      this.strField2 = strField2;
+      return this;
+    }
+
+    public DummyTransformWithConstructorAndBuilderMethods withIntField1(int intField1) {
+      this.intField1 = intField1;
+      return this;
+    }
+  }
+
+  public static class DummyTransformWithConstructorMethod extends DummyTransform {
+
+    public static DummyTransformWithConstructorMethod from(String strField1) {
+      DummyTransformWithConstructorMethod transform = new DummyTransformWithConstructorMethod();
+      transform.strField1 = strField1;
+      return transform;
+    }
+  }
+
+  public static class DummyTransformWithConstructorMethodAndBuilderMethods extends DummyTransform {
+
+    public static DummyTransformWithConstructorMethodAndBuilderMethods from(String strField1) {
+      DummyTransformWithConstructorMethodAndBuilderMethods transform =
+          new DummyTransformWithConstructorMethodAndBuilderMethods();
+      transform.strField1 = strField1;
+      return transform;
+    }
+
+    public DummyTransformWithConstructorMethodAndBuilderMethods withStrField2(String strField2) {
+      this.strField2 = strField2;
+      return this;
+    }
+
+    public DummyTransformWithConstructorMethodAndBuilderMethods withIntField1(int intField1) {
+      this.intField1 = intField1;
+      return this;
+    }
+  }
+
+  public static class DummyTransformWithMultiLanguageAnnotations extends DummyTransform {
+
+    @MultiLanguageConstructorMethod(name = "create_transform")
+    public static DummyTransformWithMultiLanguageAnnotations from(String strField1) {
+      DummyTransformWithMultiLanguageAnnotations transform =
+          new DummyTransformWithMultiLanguageAnnotations();
+      transform.strField1 = strField1;
+      return transform;
+    }
+
+    @MultiLanguageBuilderMethod(name = "abc")
+    public DummyTransformWithMultiLanguageAnnotations withStrField2(String strField2) {
+      this.strField2 = strField2;
+      return this;
+    }
+
+    @MultiLanguageBuilderMethod(name = "xyz")
+    public DummyTransformWithMultiLanguageAnnotations withIntField1(int intField1) {
+      this.intField1 = intField1;
+      return this;
+    }
+  }
+
+  void testClassLookupExpansionRequestConstruction(

Review comment:
       Added more matchers. Tried to not introduce assumptions regarding the exact expansion of transforms.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org