You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/08 17:48:00 UTC

[GitHub] [beam] TheNeuralBit commented on a diff in pull request #16947: [BEAM-13416] Introduce Schema provider for AWS model classes extending SdkPojo

TheNeuralBit commented on code in PR #16947:
URL: https://github.com/apache/beam/pull/16947#discussion_r916997350


##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/MessageCoder.java:
##########
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sqs;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import software.amazon.awssdk.services.sqs.model.Message;
-
-/** Custom Coder for handling SendMessageRequest for using in Write. */
-public class MessageCoder extends AtomicCoder<Message> implements Serializable {

Review Comment:
   This is public and not `@Internal` so technically we shouldn't just remove it outright. That being said this and `SendMessageRequestCoder`, `MessageCoder` are pretty clearly internal coders for use in their respective IOs, so it's probably fine.
   
   I'll leave it up to you - if you want you could mark these coders deprecated and schedule them for removal in a later release.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.schemas;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.beam.sdk.io.aws2.schemas.AwsSchemaUtils.getter;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets.difference;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets.newHashSet;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import org.apache.beam.sdk.io.aws2.schemas.AwsSchemaUtils.SdkBuilderSetter;
+import org.apache.beam.sdk.io.aws2.schemas.AwsTypes.ConverterFactory;
+import org.apache.beam.sdk.schemas.CachingFactory;
+import org.apache.beam.sdk.schemas.Factory;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowWithGetters;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import software.amazon.awssdk.core.SdkField;
+import software.amazon.awssdk.core.SdkPojo;
+import software.amazon.awssdk.utils.builder.SdkBuilder;
+
+/**
+ * Schema provider for AWS {@link SdkPojo} models using the provided field metadata (@see {@link
+ * SdkPojo#sdkFields()}) rather than reflection.
+ *
+ * <p>This provider overrides{@link GetterBasedSchemaProvider#fromRowFunction(TypeDescriptor)} to
+ * instead use the generated builder factories with the setters from {@link SdkField}s, see {@link
+ * SdkField#set(Object, Object)}.
+ *
+ * <p>Note: Beam doesn't support self-recursive schemas. Some AWS models, such as {@link
+ * software.amazon.awssdk.services.dynamodb.model.AttributeValue} are self-recursive and cannot be
+ * used with this schema provider.

Review Comment:
   - Do we recommend continuing to use `AwsCoders` for this case?
   - It could also be helpful to link to the issue tracking recursive schemas.
   - nit: I'd say either "recursive" or "self-referential" instead of "self-recursive"



##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProviderTest.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.schemas;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.ARRAY;
+import static org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray;
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+import static org.apache.beam.sdk.util.SerializableUtils.ensureSerializableRoundTrip;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import java.util.function.Function;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.assertj.core.api.Condition;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+public class AwsSchemaProviderTest {

Review Comment:
   Does this exercise all the types in the mapping from `AwsTypes`?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java:
##########
@@ -281,22 +281,33 @@ public Write<T> withRetryConfiguration(RetryConfiguration retry) {
     }
 
     /**
-     * Encode the full {@code PublishResult} object, including sdkResponseMetadata and

Review Comment:
   You might use `@link` here instead of `@code`. That way it would have raised an error for the bad reference



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java:
##########
@@ -281,22 +281,33 @@ public Write<T> withRetryConfiguration(RetryConfiguration retry) {
     }
 
     /**
-     * Encode the full {@code PublishResult} object, including sdkResponseMetadata and
+     * Encode the full {@code PublishResponse} object, including sdkResponseMetadata and
      * sdkHttpMetadata with the HTTP response headers.
+     *
+     * @deprecated Writes fail fast in case of errors, no need to check headers.
      */
+    @Deprecated
     public Write<T> withFullPublishResponse() {
       return withCoder(PublishResponseCoders.fullPublishResponse());
     }
 
     /**
-     * Encode the full {@code PublishResult} object, including sdkResponseMetadata and
+     * Encode the full {@code PublishResponse} object, including sdkResponseMetadata and
      * sdkHttpMetadata but excluding the HTTP response headers.
+     *
+     * @deprecated Writes fail fast in case of errors, no need to check headers.
      */
+    @Deprecated
     public Write<T> withFullPublishResponseWithoutHeaders() {
       return withCoder(PublishResponseCoders.fullPublishResponseWithoutHeaders());
     }
 
-    /** Encode the {@code PublishResult} with the given coder. */
+    /**
+     * Encode the {@code PublishResponse} with the given coder.
+     *
+     * @deprecated Explicit usage of coders is deprecated.

Review Comment:
   nit: mention inferred schemas/AwsSchemaProvider will be used instead.



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/AwsCoders.java:
##########
@@ -37,7 +37,12 @@
 import software.amazon.awssdk.http.SdkHttpResponse;
 import software.amazon.awssdk.utils.ImmutableMap;
 
-/** {@link Coder}s for common AWS SDK objects. */
+/**
+ * {@link Coder}s for common AWS SDK objects.
+ *
+ * @deprecated Use Schema coders instead.

Review Comment:
   nit: link to `AwsSchemaProvider`?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.schemas;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.beam.sdk.io.aws2.schemas.AwsSchemaUtils.getter;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets.difference;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets.newHashSet;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import org.apache.beam.sdk.io.aws2.schemas.AwsSchemaUtils.SdkBuilderSetter;
+import org.apache.beam.sdk.io.aws2.schemas.AwsTypes.ConverterFactory;
+import org.apache.beam.sdk.schemas.CachingFactory;
+import org.apache.beam.sdk.schemas.Factory;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowWithGetters;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import software.amazon.awssdk.core.SdkField;
+import software.amazon.awssdk.core.SdkPojo;
+import software.amazon.awssdk.utils.builder.SdkBuilder;
+
+/**
+ * Schema provider for AWS {@link SdkPojo} models using the provided field metadata (@see {@link
+ * SdkPojo#sdkFields()}) rather than reflection.
+ *
+ * <p>This provider overrides{@link GetterBasedSchemaProvider#fromRowFunction(TypeDescriptor)} to
+ * instead use the generated builder factories with the setters from {@link SdkField}s, see {@link
+ * SdkField#set(Object, Object)}.

Review Comment:
   This seems like an implementation detail, maybe we should just put it in a (non-javadoc) comment around the overriden method instead?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java:
##########
@@ -281,22 +281,33 @@ public Write<T> withRetryConfiguration(RetryConfiguration retry) {
     }
 
     /**
-     * Encode the full {@code PublishResult} object, including sdkResponseMetadata and
+     * Encode the full {@code PublishResponse} object, including sdkResponseMetadata and
      * sdkHttpMetadata with the HTTP response headers.
+     *
+     * @deprecated Writes fail fast in case of errors, no need to check headers.

Review Comment:
   - Is this a change in behavior that we need to communicate to users?
   - Will these functions be removed later?



##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.schemas;
+
+import static java.util.Collections.singleton;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import static software.amazon.awssdk.core.protocol.MarshallingType.INSTANT;
+import static software.amazon.awssdk.core.protocol.MarshallingType.LIST;
+import static software.amazon.awssdk.core.protocol.MarshallingType.MAP;
+import static software.amazon.awssdk.core.protocol.MarshallingType.SDK_BYTES;
+import static software.amazon.awssdk.core.protocol.MarshallingType.SDK_POJO;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import org.apache.beam.sdk.schemas.Factory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ascii;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Instant;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.SdkField;
+import software.amazon.awssdk.core.SdkPojo;
+import software.amazon.awssdk.core.protocol.MarshallingType;
+import software.amazon.awssdk.core.traits.ListTrait;
+import software.amazon.awssdk.core.traits.MapTrait;
+import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
+import software.amazon.awssdk.core.util.DefaultSdkAutoConstructMap;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+public class AwsTypes {
+  // Mapping of simple AWS types to schema field types
+  private static final Map<MarshallingType<?>, FieldType> typeMapping =
+      ImmutableMap.<MarshallingType<?>, FieldType>builder()
+          .put(MarshallingType.STRING, FieldType.STRING)
+          .put(MarshallingType.SHORT, FieldType.INT16)
+          .put(MarshallingType.INTEGER, FieldType.INT32)
+          .put(MarshallingType.LONG, FieldType.INT64)
+          .put(MarshallingType.FLOAT, FieldType.FLOAT)
+          .put(MarshallingType.DOUBLE, FieldType.DOUBLE)
+          .put(MarshallingType.BIG_DECIMAL, FieldType.DECIMAL)
+          .put(MarshallingType.BOOLEAN, FieldType.BOOLEAN)
+          .put(INSTANT, FieldType.DATETIME)
+          .put(SDK_BYTES, FieldType.BYTES)
+          .build();
+
+  private static FieldType fieldType(SdkField<?> field, Set<Class<?>> seen) {
+    MarshallingType<?> type = field.marshallingType();
+    if (type == LIST) {
+      return FieldType.array(fieldType(elementField(field), seen));
+    } else if (type == MAP) {
+      return FieldType.map(FieldType.STRING, fieldType(valueField(field), seen));
+    } else if (type == SDK_POJO) {
+      SdkPojo builder = field.constructor().get();
+      Class<?> clazz = targetClassOf(builder);
+      checkState(!seen.contains(clazz), "Self-recursive types are not supported: %s", clazz);
+      return FieldType.row(schemaFor(builder.sdkFields(), Sets.union(seen, singleton(clazz))));
+    }
+    FieldType fieldType = typeMapping.get(type);
+    if (fieldType != null) {
+      return fieldType;
+    }
+    throw new RuntimeException(
+        String.format("Type %s of field %s is unknown.", type, normalizedNameOf(field)));
+  }
+
+  private static Schema schemaFor(List<SdkField<?>> fields, Set<Class<?>> seen) {
+    Schema.Builder builder = Schema.builder();
+    for (SdkField<?> sdkField : fields) {
+      // AWS SDK fields are all optional and marked as nullable
+      builder.addField(Field.nullable(normalizedNameOf(sdkField), fieldType(sdkField, seen)));
+    }
+    return builder.build();
+  }
+
+  static Schema schemaFor(List<SdkField<?>> fields) {
+    return schemaFor(fields, ImmutableSet.of());
+  }
+
+  /**
+   * Converter factory to handle specific AWS types.
+   *
+   * <p>Any occurrences of {@link java.time.Instant} or {@link SdkBytes} are converted to & from the
+   * corresponding Beam types. When used with {@link org.apache.beam.sdk.schemas.FieldValueSetter},
+   * any {@link Row} has to be converted back to the respective {@link SdkPojo}.
+   */
+  @SuppressWarnings("rawtypes")
+  abstract static class ConverterFactory implements Serializable {
+    @SuppressWarnings("nullness")
+    private static final SerializableFunction IDENTITY = x -> x;
+
+    private final SerializableFunction instantConverter;
+    private final SerializableFunction bytesConverter;
+    private final boolean convertPojoType;
+
+    private ConverterFactory(
+        SerializableFunction instantConverter,
+        SerializableFunction bytesConverter,
+        boolean convertPojoType) {
+      this.instantConverter = instantConverter;
+      this.bytesConverter = bytesConverter;
+      this.convertPojoType = convertPojoType;
+    }
+
+    static ConverterFactory toAws(Factory<SerializableFunction<Row, ?>> fromRowFactory) {
+      return new ToAws(fromRowFactory);
+    }
+
+    static ConverterFactory fromAws() {
+      return FromAws.INSTANCE;
+    }
+
+    static <T, X1, X2> BiConsumer<T, X1> createSetter(
+        BiConsumer<T, X2> set, SerializableFunction fn) {
+      return (obj, value) -> set.accept(obj, ((SerializableFunction<X1, X2>) fn).apply(value));
+    }
+
+    SerializableFunction pojoTypeConverter(SdkField<?> field) {
+      throw new UnsupportedOperationException();
+    }
+
+    SerializableFunction create(SdkField<?> field) {
+      return create(IDENTITY, field);
+    }
+
+    SerializableFunction create(SerializableFunction fn, SdkField<?> field) {
+      MarshallingType<?> awsType = field.marshallingType();
+      SerializableFunction converter;
+      if (awsType == SDK_POJO) {
+        converter = pojoTypeConverter(field);
+      } else if (awsType == INSTANT) {
+        converter = instantConverter;
+      } else if (awsType == SDK_BYTES) {
+        converter = bytesConverter;
+      } else if (awsType == LIST) {
+        converter = transformList(create(elementField(field)));
+      } else if (awsType == MAP) {
+        converter = transformMap(create(valueField(field)));
+      } else {
+        throw new IllegalStateException("Unexpected marshalling type " + awsType);
+      }
+      return fn != IDENTITY ? andThen(fn, nullSafe(converter)) : nullSafe(converter);
+    }
+
+    boolean needsConversion(SdkField<?> field) {
+      MarshallingType<?> type = field.marshallingType();
+      return (convertPojoType && type.equals(MarshallingType.SDK_POJO))
+          || type.equals(INSTANT)
+          || type.equals(SDK_BYTES)
+          || (type.equals(MAP) && needsConversion(valueField(field)))
+          || (type.equals(LIST) && needsConversion(elementField(field)));
+    }
+
+    private static SerializableFunction andThen(
+        SerializableFunction fn1, SerializableFunction fn2) {
+      return v -> fn2.apply(fn1.apply(v));
+    }
+
+    @SuppressWarnings("nullness")
+    private static SerializableFunction nullSafe(SerializableFunction fn) {
+      return v -> v == null ? null : fn.apply(v);
+    }
+
+    @SuppressWarnings("nullness")
+    private static SerializableFunction transformList(SerializableFunction fn) {
+      return list -> Lists.transform((List) list, fn::apply);
+    }
+
+    @SuppressWarnings("nullness")
+    private static SerializableFunction transformMap(SerializableFunction fn) {
+      return map -> Maps.transformValues((Map) map, fn::apply);
+    }
+
+    /** Converter factory from Beam row value types to AWS types. This is applicable for setters. */
+    private static class ToAws extends ConverterFactory {
+      private final Factory<SerializableFunction<Row, ?>> fromRowFactory;
+
+      ToAws(Factory<SerializableFunction<Row, ?>> fromRowFactory) {
+        super(AwsTypes::toJavaInstant, AwsTypes::toSdkBytes, true);
+        this.fromRowFactory = fromRowFactory;
+      }
+
+      @Override
+      @SuppressWarnings("nullness") // schema nullable for this factory
+      protected SerializableFunction pojoTypeConverter(SdkField<?> field) {
+        return fromRowFactory.create(targetClassOf(field.constructor().get()), null);
+      }
+    }
+
+    /**
+     * Converter factory from AWS types to Beam raw unmodified row types. This is applicable for
+     * getters and also removes default values for lists & maps to avoid serializing those.
+     */
+    private static class FromAws extends ConverterFactory {
+      private static final ConverterFactory INSTANCE = new FromAws();
+
+      FromAws() {
+        super(AwsTypes::toJodaInstant, AwsTypes::toBytes, false);
+      }
+
+      @Override
+      SerializableFunction create(SerializableFunction fn, SdkField<?> field) {
+        MarshallingType<?> type = field.marshallingType();
+        if (type.equals(MAP)) {
+          fn = skipDefaultMap(fn);
+        } else if (type.equals(LIST)) {
+          fn = skipDefaultList(fn);
+        }
+        return needsConversion(field) ? super.create(fn, field) : fn;
+      }
+
+      @SuppressWarnings("nullness")
+      private static SerializableFunction skipDefaultList(SerializableFunction fn) {
+        return in -> {
+          Object list = fn.apply(in);
+          return list != DefaultSdkAutoConstructList.getInstance() ? list : null;
+        };
+      }
+
+      @SuppressWarnings("nullness")
+      private static SerializableFunction skipDefaultMap(SerializableFunction fn) {
+        return in -> {
+          Object map = fn.apply(in);
+          return map != DefaultSdkAutoConstructMap.getInstance() ? map : null;
+        };
+      }
+    }
+  }
+
+  // Convert upper camel SDK field names to lower camel
+  static String normalizedNameOf(SdkField<?> field) {
+    String name = field.memberName();
+    return name.length() > 1 && Ascii.isLowerCase(name.charAt(1))
+        ? Ascii.toLowerCase(name.charAt(0)) + name.substring(1)
+        : name.toLowerCase(Locale.ROOT);
+  }
+
+  static java.time.Instant toJavaInstant(Object instant) {
+    return java.time.Instant.ofEpochMilli(((Instant) instant).getMillis());
+  }
+
+  private static Instant toJodaInstant(Object instant) {
+    return Instant.ofEpochMilli(((java.time.Instant) instant).toEpochMilli());
+  }
+
+  private static SdkBytes toSdkBytes(Object sdkBytes) {
+    return SdkBytes.fromByteArrayUnsafe((byte[]) sdkBytes); // TODO copy or use unsafe?
+  }
+
+  private static byte[] toBytes(Object sdkBytes) {
+    return ((SdkBytes) sdkBytes).asByteArrayUnsafe(); // TODO copy or use unsafe?

Review Comment:
   Are these TODOs something we can address now, or should there be an issue tracking it?



##########
sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProviderTest.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.schemas;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.ARRAY;
+import static org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray;
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+import static org.apache.beam.sdk.util.SerializableUtils.ensureSerializableRoundTrip;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import java.util.function.Function;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.assertj.core.api.Condition;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+
+public class AwsSchemaProviderTest {
+  private final SchemaRegistry registry = SchemaRegistry.createDefault();
+
+  private interface Schemas {
+    Schema MESSAGE_ATTRIBUTES =
+        Schema.builder()
+            .addNullableField("stringValue", FieldType.STRING)
+            .addNullableField("binaryValue", FieldType.BYTES)
+            .addNullableField("stringListValues", FieldType.array(FieldType.STRING))
+            .addNullableField("binaryListValues", FieldType.array(FieldType.BYTES))
+            .addNullableField("dataType", FieldType.STRING)
+            .build();
+
+    Schema SEND_MESSAGE_REQUEST =
+        Schema.builder()
+            .addNullableField("queueUrl", FieldType.STRING)
+            .addNullableField("messageBody", FieldType.STRING)
+            .addNullableField("delaySeconds", FieldType.INT32)
+            .addNullableField(
+                "messageAttributes",
+                FieldType.map(FieldType.STRING, FieldType.row(MESSAGE_ATTRIBUTES)))
+            .addNullableField(
+                "messageSystemAttributes",
+                FieldType.map(FieldType.STRING, FieldType.row(MESSAGE_ATTRIBUTES)))
+            .addNullableField("messageDeduplicationId", FieldType.STRING)
+            .addNullableField("messageGroupId", FieldType.STRING)
+            .build();
+  }
+
+  @Test
+  public void testSampleSchema() throws NoSuchSchemaException {
+    Schema schema = registry.getSchema(SendMessageRequest.class);
+    SchemaTestUtils.assertSchemaEquivalent(Schemas.SEND_MESSAGE_REQUEST, schema);
+  }
+
+  @Test
+  public void testRecursiveSchema() {
+    assertThatThrownBy(() -> registry.getSchema(AttributeValue.class))
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("Self-recursive types are not supported: " + AttributeValue.class);
+  }
+
+  @Test
+  public void testToRowSerializable() throws NoSuchSchemaException {
+    ensureSerializableRoundTrip(registry.getToRowFunction(SendMessageRequest.class));
+  }
+
+  @Test
+  public void testFromRowSerializable() throws NoSuchSchemaException {
+    ensureSerializableRoundTrip(registry.getFromRowFunction(SendMessageRequest.class));
+  }
+
+  @Test
+  public void testToRow() throws NoSuchSchemaException {
+    SendMessageRequest request =
+        SendMessageRequest.builder()
+            .queueUrl("queue")
+            .messageBody("body")
+            .delaySeconds(100)
+            .messageDeduplicationId("dedupId")
+            .messageGroupId("groupId")
+            .messageAttributes(
+                ImmutableMap.of(
+                    "string",
+                    attribute(b -> b.stringValue("v").dataType("String")),
+                    "binary",
+                    attribute(b -> b.binaryValue(sdkBytes("v")).dataType("Binary")),
+                    "stringList",
+                    attribute(b -> b.stringListValues("v1", "v2")),
+                    "binaryList",
+                    attribute(b -> b.binaryListValues(sdkBytes("v1"), sdkBytes("v2")))))
+            .build();
+
+    Row row = registry.getToRowFunction(SendMessageRequest.class).apply(request);
+
+    assertThat(row)
+        .has(field("queueUrl", "queue"))
+        .has(field("messageBody", "body"))
+        .has(field("delaySeconds", 100))
+        .has(field("messageDeduplicationId", "dedupId"))
+        .has(field("messageGroupId", "groupId"));
+
+    assertThat((Row) row.getMap("messageAttributes").get("string"))
+        .has(field("dataType", "String"))
+        .has(field("stringValue", "v"))
+        .has(field("binaryValue", null))
+        .has(field("stringListValues", null))
+        .has(field("binaryListValues", null));
+
+    assertThat((Row) row.getMap("messageAttributes").get("binary"))
+        .has(field("dataType", "Binary"))
+        .has(field("stringValue", null))
+        .has(field("binaryValue", bytes("v")))
+        .has(field("stringListValues", null))
+        .has(field("binaryListValues", null));
+
+    assertThat((Row) row.getMap("messageAttributes").get("stringList"))
+        .has(field("dataType", null))
+        .has(field("stringValue", null))
+        .has(field("binaryValue", null))
+        .has(field("stringListValues", ImmutableList.of("v1", "v2")))
+        .has(field("binaryListValues", null));
+
+    assertThat((Row) row.getMap("messageAttributes").get("binaryList"))
+        .has(field("dataType", null))
+        .has(field("stringValue", null))
+        .has(field("binaryValue", null))
+        .has(field("stringListValues", null))
+        .has(field("binaryListValues", ImmutableList.of(bytes("v1"), bytes("v2"))));
+  }
+
+  @Test
+  public void testFromRow() throws NoSuchSchemaException, CoderException {
+    SendMessageRequest request =
+        SendMessageRequest.builder()
+            .queueUrl("queue")
+            .messageBody("body")
+            .delaySeconds(100)
+            .messageDeduplicationId("dedupId")
+            .messageGroupId("groupId")
+            .messageAttributes(
+                ImmutableMap.of(
+                    "string",
+                    attribute(b -> b.stringValue("v").dataType("String")),
+                    "binary",
+                    attribute(b -> b.binaryValue(sdkBytes("v")).dataType("Binary")),
+                    "stringList",
+                    attribute(b -> b.stringListValues("v1", "v2")),
+                    "binaryList",
+                    attribute(b -> b.binaryListValues(sdkBytes("v1"), sdkBytes("v2")))))
+            .build();
+
+    SchemaCoder<SendMessageRequest> coder = registry.getSchemaCoder(SendMessageRequest.class);
+
+    Row row = coder.getToRowFunction().apply(request);
+    assertThat(coder.getFromRowFunction().apply(row)).isEqualTo(request);
+
+    byte[] requestBytes = encodeToByteArray(coder, request);
+    SendMessageRequest requestFromBytes = decodeFromByteArray(coder, requestBytes);
+    assertThat(requestFromBytes).isEqualTo(request);
+
+    // verify still serializable after use
+    ensureSerializableRoundTrip(coder.getToRowFunction());
+    ensureSerializableRoundTrip(coder.getFromRowFunction());
+  }
+
+  @Test
+  public void testFromRowWithPartialSchema() throws NoSuchSchemaException {
+    SerializableFunction<Row, SendMessageRequest> fromRow =
+        registry.getFromRowFunction(SendMessageRequest.class);
+
+    Schema partialSchema =
+        Schema.builder()
+            .addNullableField("queueUrl", FieldType.STRING)
+            .addNullableField("messageBody", FieldType.STRING)
+            .addNullableField("delaySeconds", FieldType.INT32)
+            .build();
+
+    SendMessageRequest request =
+        SendMessageRequest.builder()
+            .queueUrl("queue")
+            .messageBody("body")
+            .delaySeconds(100)
+            .build();
+
+    Row row = Row.withSchema(partialSchema).addValues("queue", "body", 100).build();
+
+    assertThat(fromRow.apply(row)).isEqualTo(request);
+  }
+
+  @Test
+  public void testFailFromRowOnUnknownField() throws NoSuchSchemaException {
+    SerializableFunction<Row, SendMessageRequest> fromRow =
+        registry.getFromRowFunction(SendMessageRequest.class);
+
+    Schema partialSchema =
+        Schema.builder()
+            .addNullableField("queueUrl", FieldType.STRING)
+            .addNullableField("messageBody", FieldType.STRING)
+            .addNullableField("unknownField", FieldType.INT32)
+            .build();
+
+    Row row = Row.withSchema(partialSchema).addValues("queue", "body", 100).build();
+
+    assertThatThrownBy(() -> fromRow.apply(row))
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("Row schema contains unknown fields: [unknownField]");
+  }
+
+  private static MessageAttributeValue attribute(
+      Function<MessageAttributeValue.Builder, MessageAttributeValue.Builder> b) {
+    return b.apply(MessageAttributeValue.builder()).build();
+  }
+
+  private static SdkBytes sdkBytes(String str) {
+    return SdkBytes.fromByteArrayUnsafe(bytes(str));
+  }
+
+  private static byte[] bytes(String str) {
+    return str.getBytes(UTF_8);
+  }
+
+  private <T> Condition<Row> field(String name, T value) {

Review Comment:
   This is handy, maybe it should be in [SchemaTestUtils](https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java) for discoverability?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org