You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2019/12/25 18:21:01 UTC
[beam] branch master updated: Merge pull request #10449:
[BEAM-7274] Implement the Protobuf schema provider for compiled protocol
buffers
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bb3c63b Merge pull request #10449: [BEAM-7274] Implement the Protobuf schema provider for compiled protocol buffers
bb3c63b is described below
commit bb3c63b647956a3c0dd055a786aa252c30fdae50
Author: reuvenlax <re...@google.com>
AuthorDate: Wed Dec 25 08:20:50 2019 -1000
Merge pull request #10449: [BEAM-7274] Implement the Protobuf schema provider for compiled protocol buffers
---
.../apache/beam/sdk/schemas/FieldValueSetter.java | 2 +-
.../sdk/schemas/FieldValueTypeInformation.java | 30 +-
.../beam/sdk/schemas/FromRowUsingCreator.java | 48 +-
.../beam/sdk/schemas/transforms/Convert.java | 2 +-
.../beam/sdk/schemas/utils/AvroByteBuddyUtils.java | 2 +-
.../beam/sdk/schemas/utils/ByteBuddyUtils.java | 3 +-
.../beam/sdk/schemas/utils/JavaBeanUtils.java | 35 +-
.../apache/beam/sdk/schemas/utils/POJOUtils.java | 12 +-
.../beam/sdk/schemas/utils/ReflectUtils.java | 36 +-
.../org/apache/beam/sdk/values/RowWithGetters.java | 10 +
.../extensions/protobuf/ProtoByteBuddyUtils.java | 663 +++++++++++++++++++++
.../extensions/protobuf/ProtoMessageSchema.java | 125 ++++
.../protobuf/ProtoMessageSchemaTest.java | 249 ++++++++
.../protobuf/src/test/resources/README.md | 34 ++
.../beam/sdk/extensions/protobuf/test_option_v1.pb | Bin 0 -> 18745 bytes
.../src/test/resources/test/option/v1/option.proto | 137 +++++
.../src/test/resources/test/option/v1/simple.proto | 67 +++
17 files changed, 1381 insertions(+), 74 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java
index 5d9e82b..db7caaa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.annotations.Internal;
*
* <p>An interface to set a field of a class.
*
- * <p>Implementations of this interface are generated at runtime to map Row fields back into objet
+ * <p>Implementations of this interface are generated at runtime to map Row fields back into object
* fields.
*/
@Internal
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
index a6ecc45..33ed888 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
@@ -22,7 +22,10 @@ import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.utils.ReflectUtils;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -47,6 +50,8 @@ public abstract class FieldValueTypeInformation implements Serializable {
@Nullable
public abstract Method getMethod();
+ public abstract Map<String, FieldValueTypeInformation> getOneOfTypes();
+
/** If the field is a container type, returns the element type. */
@Nullable
public abstract FieldValueTypeInformation getElementType();
@@ -62,7 +67,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
abstract Builder toBuilder();
@AutoValue.Builder
- abstract static class Builder {
+ public abstract static class Builder {
public abstract Builder setName(String name);
public abstract Builder setNullable(boolean nullable);
@@ -75,6 +80,8 @@ public abstract class FieldValueTypeInformation implements Serializable {
public abstract Builder setMethod(@Nullable Method method);
+ public abstract Builder setOneOfTypes(Map<String, FieldValueTypeInformation> oneOfTypes);
+
public abstract Builder setElementType(@Nullable FieldValueTypeInformation elementType);
public abstract Builder setMapKeyType(@Nullable FieldValueTypeInformation mapKeyType);
@@ -84,6 +91,22 @@ public abstract class FieldValueTypeInformation implements Serializable {
abstract FieldValueTypeInformation build();
}
+ public static FieldValueTypeInformation forOneOf(
+ String name, boolean nullable, Map<String, FieldValueTypeInformation> oneOfTypes) {
+ final TypeDescriptor<OneOfType.Value> typeDescriptor = TypeDescriptor.of(OneOfType.Value.class);
+ return new AutoValue_FieldValueTypeInformation.Builder()
+ .setName(name)
+ .setNullable(nullable)
+ .setType(typeDescriptor)
+ .setRawType(typeDescriptor.getRawType())
+ .setField(null)
+ .setElementType(null)
+ .setMapKeyType(null)
+ .setMapValueType(null)
+ .setOneOfTypes(oneOfTypes)
+ .build();
+ }
+
public static FieldValueTypeInformation forField(Field field) {
TypeDescriptor type = TypeDescriptor.of(field.getGenericType());
return new AutoValue_FieldValueTypeInformation.Builder()
@@ -95,6 +118,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
.setElementType(getIterableComponentType(field))
.setMapKeyType(getMapKeyType(field))
.setMapValueType(getMapValueType(field))
+ .setOneOfTypes(Collections.emptyMap())
.build();
}
@@ -119,6 +143,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
.setElementType(getIterableComponentType(type))
.setMapKeyType(getMapKeyType(type))
.setMapValueType(getMapValueType(type))
+ .setOneOfTypes(Collections.emptyMap())
.build();
}
@@ -148,6 +173,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
.setElementType(getIterableComponentType(type))
.setMapKeyType(getMapKeyType(type))
.setMapValueType(getMapValueType(type))
+ .setOneOfTypes(Collections.emptyMap())
.build();
}
@@ -175,6 +201,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
.setElementType(getIterableComponentType(componentType))
.setMapKeyType(getMapKeyType(componentType))
.setMapValueType(getMapValueType(componentType))
+ .setOneOfTypes(Collections.emptyMap())
.build();
}
@@ -217,6 +244,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
.setElementType(getIterableComponentType(mapType))
.setMapKeyType(getMapKeyType(mapType))
.setMapValueType(getMapValueType(mapType))
+ .setOneOfTypes(Collections.emptyMap())
.build();
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
index 61c0d05..499991f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
@@ -28,6 +28,7 @@ import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.RowWithGetters;
@@ -80,13 +81,7 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
FieldValueTypeInformation typeInformation = checkNotNull(typeInformations.get(i));
params[i] =
fromValue(
- type,
- row.getValue(i),
- typeInformation.getRawType(),
- typeInformation.getElementType(),
- typeInformation.getMapKeyType(),
- typeInformation.getMapValueType(),
- typeFactory);
+ type, row.getValue(i), typeInformation.getRawType(), typeInformation, typeFactory);
}
SchemaUserTypeCreator creator = schemaTypeCreatorFactory.create(clazz, schema);
@@ -99,10 +94,11 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
FieldType type,
ValueT value,
Type fieldType,
- FieldValueTypeInformation elementType,
- FieldValueTypeInformation keyType,
- FieldValueTypeInformation valueType,
+ FieldValueTypeInformation fieldValueTypeInformation,
Factory<List<FieldValueTypeInformation>> typeFactory) {
+ FieldValueTypeInformation elementType = fieldValueTypeInformation.getElementType();
+ FieldValueTypeInformation keyType = fieldValueTypeInformation.getMapKeyType();
+ FieldValueTypeInformation valueType = fieldValueTypeInformation.getMapValueType();
if (value == null) {
return null;
}
@@ -127,6 +123,22 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
valueType,
typeFactory);
} else {
+ if (type.getTypeName().isLogicalType()
+ && OneOfType.IDENTIFIER.equals(type.getLogicalType().getIdentifier())) {
+ OneOfType oneOfType = type.getLogicalType(OneOfType.class);
+ OneOfType.Value oneOfValue = oneOfType.toInputType((Row) value);
+ FieldValueTypeInformation oneOfFieldValueTypeInformation =
+ checkNotNull(
+ fieldValueTypeInformation.getOneOfTypes().get(oneOfValue.getCaseType().toString()));
+ Object fromValue =
+ fromValue(
+ oneOfValue.getFieldType(),
+ oneOfValue.getValue(),
+ oneOfFieldValueTypeInformation.getRawType(),
+ oneOfFieldValueTypeInformation,
+ typeFactory);
+ return (ValueT) oneOfType.createValue(oneOfValue.getCaseType(), fromValue);
+ }
return value;
}
}
@@ -156,9 +168,7 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
elementType,
element,
elementTypeInformation.getType().getType(),
- elementTypeInformation.getElementType(),
- elementTypeInformation.getMapKeyType(),
- elementTypeInformation.getMapValueType(),
+ elementTypeInformation,
typeFactory));
}
@@ -175,9 +185,7 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
elementType,
element,
elementTypeInformation.getType().getType(),
- elementTypeInformation.getElementType(),
- elementTypeInformation.getMapKeyType(),
- elementTypeInformation.getMapValueType(),
+ elementTypeInformation,
typeFactory));
}
@@ -196,18 +204,14 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
keyType,
entry.getKey(),
keyTypeInformation.getType().getType(),
- keyTypeInformation.getElementType(),
- keyTypeInformation.getMapKeyType(),
- keyTypeInformation.getMapValueType(),
+ keyTypeInformation,
typeFactory);
Object value =
fromValue(
valueType,
entry.getValue(),
valueTypeInformation.getType().getType(),
- valueTypeInformation.getElementType(),
- valueTypeInformation.getMapKeyType(),
- valueTypeInformation.getMapValueType(),
+ valueTypeInformation,
typeFactory);
newMap.put(key, value);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
index 5176def..5233b0c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
@@ -40,7 +40,7 @@ public class Convert {
* Convert a {@link PCollection}{@literal <InputT>} into a {@link PCollection}{@literal <Row>}.
*
* <p>The input {@link PCollection} must have a schema attached. The output collection will have
- * the same schema as the iput.
+ * the same schema as the input.
*/
public static <InputT> PTransform<PCollection<InputT>, PCollection<Row>> toRows() {
return to(Row.class);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
index fd7f601..436da6c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
@@ -54,7 +54,7 @@ class AvroByteBuddyUtils {
static <T extends SpecificRecord> SchemaUserTypeCreator getCreator(
Class<T> clazz, Schema schema) {
return CACHED_CREATORS.computeIfAbsent(
- new ClassWithSchema(clazz, schema), c -> createCreator(clazz, schema));
+ ClassWithSchema.create(clazz, schema), c -> createCreator(clazz, schema));
}
private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, Schema schema) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
index 791dafb..c00d5d0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
@@ -94,7 +94,6 @@ public class ByteBuddyUtils {
new ForLoadedType(ReadableInstant.class);
private static final ForLoadedType READABLE_PARTIAL_TYPE =
new ForLoadedType(ReadablePartial.class);
- private static final ForLoadedType OBJECT_TYPE = new ForLoadedType(Object.class);
private static final ForLoadedType INTEGER_TYPE = new ForLoadedType(Integer.class);
private static final ForLoadedType ENUM_TYPE = new ForLoadedType(Enum.class);
private static final ForLoadedType BYTE_BUDDY_UTILS_TYPE =
@@ -134,7 +133,7 @@ public class ByteBuddyUtils {
// Create a new FieldValueGetter subclass.
@SuppressWarnings("unchecked")
- static DynamicType.Builder<FieldValueGetter> subclassGetterInterface(
+ public static DynamicType.Builder<FieldValueGetter> subclassGetterInterface(
ByteBuddy byteBuddy, Type objectType, Type fieldType) {
TypeDescription.Generic getterGenericType =
TypeDescription.Generic.Builder.parameterizedType(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
index 759d77d..e25342b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -102,7 +102,7 @@ public class JavaBeanUtils {
public static List<FieldValueTypeInformation> getFieldTypes(
Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
return CACHED_FIELD_TYPES.computeIfAbsent(
- new ClassWithSchema(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema));
+ ClassWithSchema.create(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema));
}
// The list of getters for a class is cached, so we only create the classes the first time
@@ -121,7 +121,7 @@ public class JavaBeanUtils {
FieldValueTypeSupplier fieldValueTypeSupplier,
TypeConversionsFactory typeConversionsFactory) {
return CACHED_GETTERS.computeIfAbsent(
- new ClassWithSchema(clazz, schema),
+ ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return types.stream()
@@ -130,7 +130,7 @@ public class JavaBeanUtils {
});
}
- private static <T> FieldValueGetter createGetter(
+ public static <T> FieldValueGetter createGetter(
FieldValueTypeInformation typeInformation, TypeConversionsFactory typeConversionsFactory) {
DynamicType.Builder<FieldValueGetter> builder =
ByteBuddyUtils.subclassGetterInterface(
@@ -184,7 +184,7 @@ public class JavaBeanUtils {
FieldValueTypeSupplier fieldValueTypeSupplier,
TypeConversionsFactory typeConversionsFactory) {
return CACHED_SETTERS.computeIfAbsent(
- new ClassWithSchema(clazz, schema),
+ ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return types.stream()
@@ -193,14 +193,14 @@ public class JavaBeanUtils {
});
}
- private static FieldValueSetter createSetter(
+ public static FieldValueSetter createSetter(
FieldValueTypeInformation typeInformation, TypeConversionsFactory typeConversionsFactory) {
DynamicType.Builder<FieldValueSetter> builder =
ByteBuddyUtils.subclassSetterInterface(
BYTE_BUDDY,
typeInformation.getMethod().getDeclaringClass(),
typeConversionsFactory.createTypeConversion(false).convert(typeInformation.getType()));
- builder = implementSetterMethods(builder, typeInformation.getMethod(), typeConversionsFactory);
+ builder = implementSetterMethods(builder, typeInformation, typeConversionsFactory);
try {
return builder
.make()
@@ -222,14 +222,13 @@ public class JavaBeanUtils {
private static DynamicType.Builder<FieldValueSetter> implementSetterMethods(
DynamicType.Builder<FieldValueSetter> builder,
- Method method,
+ FieldValueTypeInformation fieldValueTypeInformation,
TypeConversionsFactory typeConversionsFactory) {
- FieldValueTypeInformation javaTypeInformation = FieldValueTypeInformation.forSetter(method);
return builder
.method(ElementMatchers.named("name"))
- .intercept(FixedValue.reference(javaTypeInformation.getName()))
+ .intercept(FixedValue.reference(fieldValueTypeInformation.getName()))
.method(ElementMatchers.named("set"))
- .intercept(new InvokeSetterInstruction(method, typeConversionsFactory));
+ .intercept(new InvokeSetterInstruction(fieldValueTypeInformation, typeConversionsFactory));
}
// The list of constructors for a class is cached, so we only create the classes the first time
@@ -244,7 +243,7 @@ public class JavaBeanUtils {
FieldValueTypeSupplier fieldValueTypeSupplier,
TypeConversionsFactory typeConversionsFactory) {
return CACHED_CREATORS.computeIfAbsent(
- new ClassWithSchema(clazz, schema),
+ ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return createConstructorCreator(
@@ -291,7 +290,7 @@ public class JavaBeanUtils {
FieldValueTypeSupplier fieldValueTypeSupplier,
TypeConversionsFactory typeConversionsFactory) {
return CACHED_CREATORS.computeIfAbsent(
- new ClassWithSchema(clazz, schema),
+ ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return createStaticCreator(clazz, creator, schema, types, typeConversionsFactory);
@@ -377,11 +376,13 @@ public class JavaBeanUtils {
// Implements a method to write a public set out on an object.
private static class InvokeSetterInstruction implements Implementation {
// Setter method that will be invoked
- private Method method;
+ private FieldValueTypeInformation fieldValueTypeInformation;
private final TypeConversionsFactory typeConversionsFactory;
- InvokeSetterInstruction(Method method, TypeConversionsFactory typeConversionsFactory) {
- this.method = method;
+ InvokeSetterInstruction(
+ FieldValueTypeInformation fieldValueTypeInformation,
+ TypeConversionsFactory typeConversionsFactory) {
+ this.fieldValueTypeInformation = fieldValueTypeInformation;
this.typeConversionsFactory = typeConversionsFactory;
}
@@ -393,13 +394,13 @@ public class JavaBeanUtils {
@Override
public ByteCodeAppender appender(final Target implementationTarget) {
return (methodVisitor, implementationContext, instrumentedMethod) -> {
- FieldValueTypeInformation javaTypeInformation = FieldValueTypeInformation.forSetter(method);
// this + method parameters.
int numLocals = 1 + instrumentedMethod.getParameters().size();
// The instruction to read the field.
StackManipulation readField = MethodVariableAccess.REFERENCE.loadFrom(2);
+ Method method = fieldValueTypeInformation.getMethod();
boolean setterMethodReturnsVoid = method.getReturnType().equals(Void.TYPE);
// Read the object onto the stack.
StackManipulation stackManipulation =
@@ -409,7 +410,7 @@ public class JavaBeanUtils {
// Do any conversions necessary.
typeConversionsFactory
.createSetterConversions(readField)
- .convert(javaTypeInformation.getType()),
+ .convert(fieldValueTypeInformation.getType()),
// Now update the field and return void.
MethodInvocation.invoke(new ForLoadedMethod(method)));
if (!setterMethodReturnsVoid) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
index a58ddf8..aa968b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
@@ -81,7 +81,7 @@ public class POJOUtils {
public static List<FieldValueTypeInformation> getFieldTypes(
Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
return CACHED_FIELD_TYPES.computeIfAbsent(
- new ClassWithSchema(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema));
+ ClassWithSchema.create(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema));
}
// The list of getters for a class is cached, so we only create the classes the first time
@@ -96,7 +96,7 @@ public class POJOUtils {
TypeConversionsFactory typeConversionsFactory) {
// Return the getters ordered by their position in the schema.
return CACHED_GETTERS.computeIfAbsent(
- new ClassWithSchema(clazz, schema),
+ ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
List<FieldValueGetter> getters =
@@ -122,7 +122,7 @@ public class POJOUtils {
FieldValueTypeSupplier fieldValueTypeSupplier,
TypeConversionsFactory typeConversionsFactory) {
return CACHED_CREATORS.computeIfAbsent(
- new ClassWithSchema(clazz, schema),
+ ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return createSetFieldCreator(clazz, schema, types, typeConversionsFactory);
@@ -169,7 +169,7 @@ public class POJOUtils {
FieldValueTypeSupplier fieldValueTypeSupplier,
TypeConversionsFactory typeConversionsFactory) {
return CACHED_CREATORS.computeIfAbsent(
- new ClassWithSchema(clazz, schema),
+ ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return createConstructorCreator(
@@ -217,7 +217,7 @@ public class POJOUtils {
FieldValueTypeSupplier fieldValueTypeSupplier,
TypeConversionsFactory typeConversionsFactory) {
return CACHED_CREATORS.computeIfAbsent(
- new ClassWithSchema(clazz, schema),
+ ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return createStaticCreator(clazz, creator, schema, types, typeConversionsFactory);
@@ -323,7 +323,7 @@ public class POJOUtils {
TypeConversionsFactory typeConversionsFactory) {
// Return the setters, ordered by their position in the schema.
return CACHED_SETTERS.computeIfAbsent(
- new ClassWithSchema(clazz, schema),
+ ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return types.stream()
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
index d56f0bd..08c494c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.schemas.utils;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import com.google.auto.value.AutoValue;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@@ -31,7 +32,6 @@ import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.Schema;
@@ -39,35 +39,21 @@ import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimaps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Primitives;
/** A set of reflection helper methods. */
public class ReflectUtils {
/** Represents a class and a schema. */
- public static class ClassWithSchema {
- private final Class clazz;
- private final Schema schema;
+ @AutoValue
+ public abstract static class ClassWithSchema {
+ public abstract Class getClazz();
- public ClassWithSchema(Class clazz, Schema schema) {
- this.clazz = clazz;
- this.schema = schema;
- }
+ public abstract Schema getSchema();
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ClassWithSchema that = (ClassWithSchema) o;
- return Objects.equals(clazz, that.clazz) && Objects.equals(schema, that.schema);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(clazz, schema);
+ public static ClassWithSchema create(Class clazz, Schema schema) {
+ return new AutoValue_ReflectUtils_ClassWithSchema(clazz, schema);
}
}
@@ -94,6 +80,10 @@ public class ReflectUtils {
});
}
+ public static Multimap<String, Method> getMethodsMap(Class clazz) {
+ return Multimaps.index(getMethods(clazz), Method::getName);
+ }
+
@Nullable
public static Constructor getAnnotatedConstructor(Class clazz) {
return Arrays.stream(clazz.getDeclaredConstructors())
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
index ebf59b9..0d68709 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Collections2;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -122,6 +123,15 @@ public class RowWithGetters extends Row {
cacheKey, i -> getMapValue(type.getMapKeyType(), type.getMapValueType(), map))
: (T) getMapValue(type.getMapKeyType(), type.getMapValueType(), map);
} else {
+ if (type.isLogicalType(OneOfType.IDENTIFIER)) {
+ OneOfType oneOfType = type.getLogicalType(OneOfType.class);
+ OneOfType.Value oneOfValue = (OneOfType.Value) fieldValue;
+ Object convertedOneOfField =
+ getValue(oneOfValue.getFieldType(), oneOfValue.getValue(), null);
+ return (T)
+ oneOfType.toBaseType(
+ oneOfType.createValue(oneOfValue.getCaseType(), convertedOneOfField));
+ }
return (T) fieldValue;
}
}
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteBuddyUtils.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteBuddyUtils.java
new file mode 100644
index 0000000..6827fb1
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteBuddyUtils.java
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.getFieldNumber;
+
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.Duration;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.ProtocolMessageEnum;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.UInt32Value;
+import com.google.protobuf.UInt64Value;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.protobuf.ProtoSchemaLogicalTypes.DurationNanos;
+import org.apache.beam.sdk.extensions.protobuf.ProtoSchemaLogicalTypes.TimestampNanos;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.FieldValueSetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType.Value;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForSetter;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.InjectPackageStrategy;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversion;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversionsFactory;
+import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
+import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.ByteBuddy;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.method.MethodDescription;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.type.TypeDescription;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.dynamic.DynamicType;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.Implementation;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.StackManipulation;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.assign.Assigner;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+
+public class ProtoByteBuddyUtils {
+ private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
+ private static final TypeDescriptor<ByteString> BYTE_STRING_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(ByteString.class);
+ private static final TypeDescriptor<Timestamp> PROTO_TIMESTAMP_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(Timestamp.class);
+ private static final TypeDescriptor<Duration> PROTO_DURATION_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(Duration.class);
+ private static final TypeDescriptor<Int32Value> PROTO_INT32_VALUE_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(Int32Value.class);
+ private static final TypeDescriptor<Int64Value> PROTO_INT64_VALUE_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(Int64Value.class);
+ private static final TypeDescriptor<UInt32Value> PROTO_UINT32_VALUE_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(UInt32Value.class);
+ private static final TypeDescriptor<UInt64Value> PROTO_UINT64_VALUE_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(UInt64Value.class);
+ private static final TypeDescriptor<FloatValue> PROTO_FLOAT_VALUE_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(FloatValue.class);
+ private static final TypeDescriptor<DoubleValue> PROTO_DOUBLE_VALUE_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(DoubleValue.class);
+ private static final TypeDescriptor<BoolValue> PROTO_BOOL_VALUE_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(BoolValue.class);
+ private static final TypeDescriptor<StringValue> PROTO_STRING_VALUE_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(StringValue.class);
+ private static final TypeDescriptor<BytesValue> PROTO_BYTES_VALUE_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(BytesValue.class);
+
+ private static final ForLoadedType BYTE_STRING_TYPE = new ForLoadedType(ByteString.class);
+ private static final ForLoadedType BYTE_ARRAY_TYPE = new ForLoadedType(byte[].class);
+ private static final ForLoadedType PROTO_ENUM_TYPE = new ForLoadedType(ProtocolMessageEnum.class);
+ private static final ForLoadedType INTEGER_TYPE = new ForLoadedType(Integer.class);
+
+ private static final Map<TypeDescriptor<?>, ForLoadedType> WRAPPER_LOADED_TYPES =
+ ImmutableMap.<TypeDescriptor<?>, ForLoadedType>builder()
+ .put(PROTO_INT32_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(Int32Value.class))
+ .put(PROTO_INT64_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(Int64Value.class))
+ .put(PROTO_UINT32_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(UInt32Value.class))
+ .put(PROTO_UINT64_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(UInt64Value.class))
+ .put(PROTO_FLOAT_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(FloatValue.class))
+ .put(PROTO_DOUBLE_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(DoubleValue.class))
+ .put(PROTO_BOOL_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(BoolValue.class))
+ .put(PROTO_STRING_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(StringValue.class))
+ .put(PROTO_BYTES_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(BytesValue.class))
+ .build();
+
+ private static final Map<TypeName, String> PROTO_GETTER_SUFFIX =
+ ImmutableMap.of(
+ TypeName.ARRAY, "List",
+ TypeName.ITERABLE, "List",
+ TypeName.MAP, "Map");
+
+ private static final Map<TypeName, String> PROTO_SETTER_PREFIX =
+ ImmutableMap.of(
+ TypeName.ARRAY, "addAll",
+ TypeName.ITERABLE, "addAll",
+ TypeName.MAP, "putAll");
+ private static final String DEFAULT_PROTO_GETTER_PREFIX = "get";
+ private static final String DEFAULT_PROTO_SETTER_PREFIX = "set";
+
+ static String protoGetterName(String name, FieldType fieldType) {
+ final String camel = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, name);
+ return DEFAULT_PROTO_GETTER_PREFIX
+ + camel
+ + PROTO_GETTER_SUFFIX.getOrDefault(fieldType.getTypeName(), "");
+ }
+
+ static String protoSetterName(String name, FieldType fieldType) {
+ final String camel = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, name);
+ return protoSetterPrefix(fieldType) + camel;
+ }
+
+ static String protoSetterPrefix(FieldType fieldType) {
+ return PROTO_SETTER_PREFIX.getOrDefault(fieldType.getTypeName(), DEFAULT_PROTO_SETTER_PREFIX);
+ }
+
+ static class ProtoConvertType extends ConvertType {
+ ProtoConvertType(boolean returnRawValues) {
+ super(returnRawValues);
+ }
+
+ private static final Map<TypeDescriptor<?>, Class<?>> TYPE_OVERRIDES =
+ ImmutableMap.<TypeDescriptor<?>, Class<?>>builder()
+ .put(PROTO_TIMESTAMP_TYPE_DESCRIPTOR, Row.class)
+ .put(PROTO_DURATION_TYPE_DESCRIPTOR, Row.class)
+ .put(PROTO_INT32_VALUE_TYPE_DESCRIPTOR, Integer.class)
+ .put(PROTO_INT64_VALUE_TYPE_DESCRIPTOR, Long.class)
+ .put(PROTO_UINT32_VALUE_TYPE_DESCRIPTOR, Integer.class)
+ .put(PROTO_UINT64_VALUE_TYPE_DESCRIPTOR, Long.class)
+ .put(PROTO_FLOAT_VALUE_TYPE_DESCRIPTOR, Float.class)
+ .put(PROTO_DOUBLE_VALUE_TYPE_DESCRIPTOR, Double.class)
+ .put(PROTO_BOOL_VALUE_TYPE_DESCRIPTOR, Boolean.class)
+ .put(PROTO_STRING_VALUE_TYPE_DESCRIPTOR, String.class)
+ .put(PROTO_BYTES_VALUE_TYPE_DESCRIPTOR, byte[].class)
+ .build();
+
+ @Override
+ public Type convert(TypeDescriptor typeDescriptor) {
+ if (typeDescriptor.equals(BYTE_STRING_TYPE_DESCRIPTOR)
+ || typeDescriptor.isSubtypeOf(BYTE_STRING_TYPE_DESCRIPTOR)) {
+ return byte[].class;
+ } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ProtocolMessageEnum.class))) {
+ return Integer.class;
+ } else if (typeDescriptor.equals(PROTO_TIMESTAMP_TYPE_DESCRIPTOR)
+ || typeDescriptor.equals(PROTO_DURATION_TYPE_DESCRIPTOR)) {
+ return Row.class;
+ } else {
+ Type type = TYPE_OVERRIDES.get(typeDescriptor);
+ return (type != null) ? type : super.convert(typeDescriptor);
+ }
+ }
+ }
+
+ static class ProtoConvertValueForGetter extends ConvertValueForGetter {
+ ProtoConvertValueForGetter(StackManipulation readValue) {
+ super(readValue);
+ }
+
+ @Override
+ protected ProtoTypeConversionsFactory getFactory() {
+ return new ProtoTypeConversionsFactory();
+ }
+
+ @Override
+ public StackManipulation convert(TypeDescriptor type) {
+ if (type.equals(BYTE_STRING_TYPE_DESCRIPTOR)
+ || type.isSubtypeOf(BYTE_STRING_TYPE_DESCRIPTOR)) {
+ return new Compound(
+ readValue,
+ MethodInvocation.invoke(
+ BYTE_STRING_TYPE
+ .getDeclaredMethods()
+ .filter(ElementMatchers.named("toByteArray"))
+ .getOnly()));
+ } else if (type.isSubtypeOf(TypeDescriptor.of(ProtocolMessageEnum.class))) {
+ return new Compound(
+ readValue,
+ MethodInvocation.invoke(
+ PROTO_ENUM_TYPE
+ .getDeclaredMethods()
+ .filter(
+ ElementMatchers.named("getNumber").and(ElementMatchers.takesArguments(0)))
+ .getOnly()),
+ Assigner.DEFAULT.assign(
+ INTEGER_TYPE.asUnboxed().asGenericType(),
+ INTEGER_TYPE.asGenericType(),
+ Typing.STATIC));
+ } else if (type.equals(PROTO_TIMESTAMP_TYPE_DESCRIPTOR)) {
+ return new Compound(
+ readValue,
+ MethodInvocation.invoke(
+ new ForLoadedType(TimestampNanos.class)
+ .getDeclaredMethods()
+ .filter(ElementMatchers.named("toRow"))
+ .getOnly()));
+ } else if (type.equals(PROTO_DURATION_TYPE_DESCRIPTOR)) {
+ return new Compound(
+ readValue,
+ MethodInvocation.invoke(
+ new ForLoadedType(DurationNanos.class)
+ .getDeclaredMethods()
+ .filter(ElementMatchers.named("toRow"))
+ .getOnly()));
+ } else {
+ ForLoadedType wrapperType = WRAPPER_LOADED_TYPES.get(type);
+ if (wrapperType != null) {
+ MethodDescription.InDefinedShape getValueMethod =
+ wrapperType.getDeclaredMethods().filter(ElementMatchers.named("getValue")).getOnly();
+ TypeDescription.Generic returnType = getValueMethod.getReturnType();
+ StackManipulation stackManipulation =
+ new Compound(
+ readValue,
+ MethodInvocation.invoke(getValueMethod),
+ Assigner.DEFAULT.assign(
+ returnType, returnType.asErasure().asBoxed().asGenericType(), Typing.STATIC));
+ if (type.equals(PROTO_BYTES_VALUE_TYPE_DESCRIPTOR)) {
+ stackManipulation =
+ getFactory()
+ .createGetterConversions(stackManipulation)
+ .convert(BYTE_STRING_TYPE_DESCRIPTOR);
+ }
+ return stackManipulation;
+ }
+ return super.convert(type);
+ }
+ }
+ }
+
+ static class ProtoConvertValueForSetter extends ConvertValueForSetter {
+ ProtoConvertValueForSetter(StackManipulation readValue) {
+ super(readValue);
+ }
+
+ @Override
+ protected ProtoTypeConversionsFactory getFactory() {
+ return new ProtoTypeConversionsFactory();
+ }
+
+ @Override
+ public StackManipulation convert(TypeDescriptor type) {
+ if (type.isSubtypeOf(TypeDescriptor.of(ByteString.class))) {
+ return new Compound(
+ readValue,
+ TypeCasting.to(BYTE_ARRAY_TYPE),
+ MethodInvocation.invoke(
+ BYTE_STRING_TYPE
+ .getDeclaredMethods()
+ .filter(
+ ElementMatchers.named("copyFrom")
+ .and(ElementMatchers.takesArguments(BYTE_ARRAY_TYPE)))
+ .getOnly()));
+ } else if (type.isSubtypeOf(TypeDescriptor.of(ProtocolMessageEnum.class))) {
+ ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+ // Convert the stored number back to the enum constant.
+ return new Compound(
+ readValue,
+ Assigner.DEFAULT.assign(
+ INTEGER_TYPE.asBoxed().asGenericType(),
+ INTEGER_TYPE.asUnboxed().asGenericType(),
+ Typing.STATIC),
+ MethodInvocation.invoke(
+ loadedType
+ .getDeclaredMethods()
+ .filter(
+ ElementMatchers.named("forNumber")
+ .and(ElementMatchers.isStatic().and(ElementMatchers.takesArguments(1))))
+ .getOnly()));
+ } else if (type.equals(PROTO_TIMESTAMP_TYPE_DESCRIPTOR)) {
+ return new Compound(
+ readValue,
+ MethodInvocation.invoke(
+ new ForLoadedType(TimestampNanos.class)
+ .getDeclaredMethods()
+ .filter(ElementMatchers.named("toTimestamp"))
+ .getOnly()));
+ } else if (type.equals(PROTO_DURATION_TYPE_DESCRIPTOR)) {
+ return new Compound(
+ readValue,
+ MethodInvocation.invoke(
+ new ForLoadedType(DurationNanos.class)
+ .getDeclaredMethods()
+ .filter(ElementMatchers.named("toDuration"))
+ .getOnly()));
+ } else {
+ ForLoadedType wrapperType = WRAPPER_LOADED_TYPES.get(type);
+ if (wrapperType != null) {
+ if (type.equals(PROTO_BYTES_VALUE_TYPE_DESCRIPTOR)) {
+ readValue =
+ getFactory()
+ .createSetterConversions(readValue)
+ .convert(TypeDescriptor.of(ByteString.class));
+ }
+ MethodDescription.InDefinedShape ofMethod =
+ wrapperType.getDeclaredMethods().filter(ElementMatchers.named("of")).getOnly();
+ TypeDescription.Generic argumentType = ofMethod.getParameters().get(0).getType();
+ return new Compound(
+ readValue,
+ Assigner.DEFAULT.assign(
+ argumentType.asErasure().asBoxed().asGenericType(), argumentType, Typing.STATIC),
+ MethodInvocation.invoke(ofMethod));
+ } else {
+ return super.convert(type);
+ }
+ }
+ }
+ }
+
+ static class ProtoTypeConversionsFactory implements TypeConversionsFactory {
+ @Override
+ public TypeConversion<Type> createTypeConversion(boolean returnRawTypes) {
+ return new ProtoConvertType(returnRawTypes);
+ }
+
+ @Override
+ public TypeConversion<StackManipulation> createGetterConversions(StackManipulation readValue) {
+ return new ProtoConvertValueForGetter(readValue);
+ }
+
+ @Override
+ public TypeConversion<StackManipulation> createSetterConversions(StackManipulation readValue) {
+ return new ProtoConvertValueForSetter(readValue);
+ }
+ }
+
+ // The list of getters for a class is cached, so we only create the classes the first time
+ // getSetters is called.
+ private static final Map<ClassWithSchema, List<FieldValueGetter>> CACHED_GETTERS =
+ Maps.newConcurrentMap();
+
+ /**
+ * Return the list of {@link FieldValueGetter}s for a Java Bean class
+ *
+ * <p>The returned list is ordered by the order of fields in the schema.
+ */
+ public static List<FieldValueGetter> getGetters(
+ Class<?> clazz,
+ Schema schema,
+ FieldValueTypeSupplier fieldValueTypeSupplier,
+ TypeConversionsFactory typeConversionsFactory) {
+ Multimap<String, Method> methods = ReflectUtils.getMethodsMap(clazz);
+ return CACHED_GETTERS.computeIfAbsent(
+ ClassWithSchema.create(clazz, schema),
+ c -> {
+ List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
+ return types.stream()
+ .map(
+ t ->
+ createGetter(
+ t,
+ typeConversionsFactory,
+ clazz,
+ methods,
+ schema.getField(t.getName()),
+ fieldValueTypeSupplier))
+ .collect(Collectors.toList());
+ });
+ }
+
+ static class OneOfFieldValueGetter<ProtoT extends MessageLite>
+ implements FieldValueGetter<ProtoT, OneOfType.Value> {
+ private final String name;
+ private final Supplier<Method> getCaseMethod;
+ private final Map<Integer, FieldValueGetter<ProtoT, ?>> getterMethodMap;
+ private final OneOfType oneOfType;
+
+ public OneOfFieldValueGetter(
+ String name,
+ Supplier<Method> getCaseMethod,
+ Map<Integer, FieldValueGetter<ProtoT, ?>> getterMethodMap,
+ OneOfType oneOfType) {
+ this.name = name;
+ this.getCaseMethod = getCaseMethod;
+ this.getterMethodMap = getterMethodMap;
+ this.oneOfType = oneOfType;
+ }
+
+ @Nullable
+ @Override
+ public Value get(ProtoT object) {
+ try {
+ EnumLite caseValue = (EnumLite) getCaseMethod.get().invoke(object);
+ if (caseValue.getNumber() == 0) {
+ return null;
+ } else {
+ Object value = getterMethodMap.get(caseValue.getNumber()).get(object);
+ return oneOfType.createValue(
+ oneOfType.getCaseEnumType().valueOf(caseValue.getNumber()), value);
+ }
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+ }
+
+ private static FieldValueGetter createGetter(
+ FieldValueTypeInformation fieldValueTypeInformation,
+ TypeConversionsFactory typeConversionsFactory,
+ Class clazz,
+ Multimap<String, Method> methods,
+ Field field,
+ FieldValueTypeSupplier fieldValueTypeSupplier) {
+ if (field.getType().isLogicalType(OneOfType.IDENTIFIER)) {
+ OneOfType oneOfType = field.getType().getLogicalType(OneOfType.class);
+
+ // The case accessor method in the proto is named getOneOfNameCase.
+ Method caseMethod =
+ getProtoGetter(
+ methods,
+ field.getName() + "_case",
+ FieldType.logicalType(oneOfType.getCaseEnumType()));
+ Map<Integer, FieldValueGetter> oneOfGetters = Maps.newHashMap();
+ Map<String, FieldValueTypeInformation> oneOfFieldTypes =
+ fieldValueTypeSupplier.get(clazz, oneOfType.getOneOfSchema()).stream()
+ .collect(Collectors.toMap(FieldValueTypeInformation::getName, f -> f));
+ for (Field oneOfField : oneOfType.getOneOfSchema().getFields()) {
+ int protoFieldIndex = getFieldNumber(oneOfField.getType());
+ FieldValueGetter oneOfFieldGetter =
+ createGetter(
+ oneOfFieldTypes.get(oneOfField.getName()),
+ typeConversionsFactory,
+ clazz,
+ methods,
+ oneOfField,
+ fieldValueTypeSupplier);
+ oneOfGetters.put(protoFieldIndex, oneOfFieldGetter);
+ }
+ return new OneOfFieldValueGetter(
+ field.getName(),
+ (Supplier<Method> & Serializable) () -> caseMethod,
+ oneOfGetters,
+ oneOfType);
+ } else {
+ return JavaBeanUtils.createGetter(fieldValueTypeInformation, typeConversionsFactory);
+ }
+ }
+
+ private static Class getProtoGeneratedBuilder(Class<?> clazz) {
+ String builderClassName = clazz.getName() + "$Builder";
+ try {
+ return Class.forName(builderClassName);
+ } catch (ClassNotFoundException e) {
+ return null;
+ }
+ }
+
+ static Method getProtoSetter(Multimap<String, Method> methods, String name, FieldType fieldType) {
+ final TypeDescriptor<MessageLite.Builder> builderDescriptor =
+ TypeDescriptor.of(MessageLite.Builder.class);
+ return methods.get(protoSetterName(name, fieldType)).stream()
+ // Setter methods take only a single parameter.
+ .filter(m -> m.getParameterCount() == 1)
+ // For nested types, we don't use the version that takes a builder.
+ .filter(
+ m -> !TypeDescriptor.of(m.getGenericParameterTypes()[0]).isSubtypeOf(builderDescriptor))
+ .findAny()
+ .orElseThrow(IllegalArgumentException::new);
+ }
+
+ static Method getProtoGetter(Multimap<String, Method> methods, String name, FieldType fieldType) {
+ return methods.get(protoGetterName(name, fieldType)).stream()
+ .filter(m -> m.getParameterCount() == 0)
+ .findAny()
+ .orElseThrow(IllegalArgumentException::new);
+ }
+
+ @Nullable
+ public static SchemaUserTypeCreator getBuilderCreator(
+ Class<?> protoClass, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
+ Class<?> builderClass = getProtoGeneratedBuilder(protoClass);
+ if (builderClass == null) {
+ return null;
+ }
+
+ List<FieldValueSetter> setters = Lists.newArrayListWithCapacity(schema.getFieldCount());
+ Multimap<String, Method> methods = ReflectUtils.getMethodsMap(builderClass);
+ for (Field field : schema.getFields()) {
+ if (field.getType().isLogicalType(OneOfType.IDENTIFIER)) {
+ OneOfType oneOfType = field.getType().getLogicalType(OneOfType.class);
+ Map<Integer, Method> oneOfMethods = Maps.newHashMap();
+ for (Field oneOfField : oneOfType.getOneOfSchema().getFields()) {
+ Method method = getProtoSetter(methods, oneOfField.getName(), oneOfField.getType());
+ oneOfMethods.put(getFieldNumber(oneOfField.getType()), method);
+ }
+ setters.add(
+ new ProtoOneOfSetter(
+ (Function<Integer, Method> & Serializable) oneOfMethods::get, field.getName()));
+ } else {
+ Method method = getProtoSetter(methods, field.getName(), field.getType());
+ setters.add(
+ JavaBeanUtils.createSetter(
+ FieldValueTypeInformation.forSetter(method, protoSetterPrefix(field.getType())),
+ new ProtoTypeConversionsFactory()));
+ }
+ }
+ return createBuilderCreator(protoClass, builderClass, setters, schema);
+ }
+
+ static class ProtoOneOfSetter<BuilderT extends MessageLite.Builder>
+ implements FieldValueSetter<BuilderT, OneOfType.Value> {
+ private final Function<Integer, Method> methods;
+ private final String name;
+
+ ProtoOneOfSetter(Function<Integer, Method> methods, String name) {
+ this.methods = methods;
+ this.name = name;
+ }
+
+ @Override
+ public void set(BuilderT builder, OneOfType.Value oneOfValue) {
+ Method method = methods.apply(oneOfValue.getCaseType().getValue());
+ try {
+ method.invoke(builder, oneOfValue.getValue());
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+ }
+
+ static SchemaUserTypeCreator createBuilderCreator(
+ Class<?> protoClass, Class<?> builderClass, List<FieldValueSetter> setters, Schema schema) {
+ try {
+ DynamicType.Builder<Supplier> builder =
+ BYTE_BUDDY
+ .with(new InjectPackageStrategy(builderClass))
+ .subclass(Supplier.class)
+ .method(ElementMatchers.named("get"))
+ .intercept(new BuilderSupplier(protoClass));
+ Supplier supplier =
+ builder
+ .make()
+ .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded()
+ .getDeclaredConstructor()
+ .newInstance();
+ return new ProtoCreatorFactory(supplier, setters);
+ } catch (InstantiationException
+ | IllegalAccessException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
+ throw new RuntimeException(
+ "Unable to generate a creator for class " + builderClass + " with schema " + schema);
+ }
+ }
+
+ static class ProtoCreatorFactory implements SchemaUserTypeCreator {
+ private final Supplier<? extends MessageLite.Builder> builderCreator;
+ private final List<FieldValueSetter> setters;
+
+ public ProtoCreatorFactory(
+ Supplier<? extends MessageLite.Builder> builderCreator, List<FieldValueSetter> setters) {
+ this.builderCreator = builderCreator;
+ this.setters = setters;
+ }
+
+ @Override
+ public Object create(Object... params) {
+ MessageLite.Builder builder = builderCreator.get();
+ for (int i = 0; i < params.length; ++i) {
+ setters.get(i).set(builder, params[i]);
+ }
+ return builder.build();
+ }
+ }
+
+ static class BuilderSupplier implements Implementation {
+ private final Class<?> protoClass;
+
+ public BuilderSupplier(Class<?> protoClass) {
+ this.protoClass = protoClass;
+ }
+
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ ForLoadedType loadedProto = new ForLoadedType(protoClass);
+ return (methodVisitor, implementationContext, instrumentedMethod) -> {
+ // this + method parameters.
+ int numLocals = 1 + instrumentedMethod.getParameters().size();
+
+ // Create the builder object by calling ProtoClass.newBuilder().
+ StackManipulation stackManipulation =
+ new StackManipulation.Compound(
+ MethodInvocation.invoke(
+ loadedProto
+ .getDeclaredMethods()
+ .filter(
+ ElementMatchers.named("newBuilder")
+ .and(ElementMatchers.takesArguments(0)))
+ .getOnly()),
+ MethodReturn.REFERENCE);
+ StackManipulation.Size size = stackManipulation.apply(methodVisitor, implementationContext);
+ return new ByteCodeAppender.Size(size.getMaximalSize(), numLocals);
+ };
+ }
+ }
+}
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java
new file mode 100644
index 0000000..47a928c
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static org.apache.beam.sdk.extensions.protobuf.ProtoByteBuddyUtils.getProtoGetter;
+
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.extensions.protobuf.ProtoByteBuddyUtils.ProtoTypeConversionsFactory;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
+import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+
+@Experimental(Kind.SCHEMAS)
+public class ProtoMessageSchema extends GetterBasedSchemaProvider {
+
+ private static final class ProtoClassFieldValueTypeSupplier implements FieldValueTypeSupplier {
+ @Override
+ public List<FieldValueTypeInformation> get(Class<?> clazz) {
+ throw new RuntimeException("Unexpected call.");
+ }
+
+ @Override
+ public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+ Multimap<String, Method> methods = ReflectUtils.getMethodsMap(clazz);
+ List<FieldValueTypeInformation> types =
+ Lists.newArrayListWithCapacity(schema.getFieldCount());
+ for (Field field : schema.getFields()) {
+ if (field.getType().isLogicalType(OneOfType.IDENTIFIER)) {
+ // This is a OneOf. Look for the getters for each OneOf option.
+ OneOfType oneOfType = field.getType().getLogicalType(OneOfType.class);
+ Map<String, FieldValueTypeInformation> oneOfTypes = Maps.newHashMap();
+ for (Field oneOfField : oneOfType.getOneOfSchema().getFields()) {
+ Method method = getProtoGetter(methods, oneOfField.getName(), oneOfField.getType());
+ oneOfTypes.put(
+ oneOfField.getName(),
+ FieldValueTypeInformation.forGetter(method).withName(field.getName()));
+ }
+ // Add an entry that encapsulates information about all possible getters.
+ types.add(
+ FieldValueTypeInformation.forOneOf(
+ field.getName(), field.getType().getNullable(), oneOfTypes)
+ .withName(field.getName()));
+ } else {
+ // This is a simple field. Add the getter.
+ Method method = getProtoGetter(methods, field.getName(), field.getType());
+ types.add(FieldValueTypeInformation.forGetter(method).withName(field.getName()));
+ }
+ }
+ return types;
+ }
+ }
+
+ @Nullable
+ @Override
+ public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
+ checkForDynamicType(typeDescriptor);
+ return ProtoSchemaTranslator.getSchema((Class<Message>) typeDescriptor.getRawType());
+ }
+
+ @Override
+ public List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema) {
+ return ProtoByteBuddyUtils.getGetters(
+ targetClass,
+ schema,
+ new ProtoClassFieldValueTypeSupplier(),
+ new ProtoTypeConversionsFactory());
+ }
+
+ @Override
+ public List<FieldValueTypeInformation> fieldValueTypeInformations(
+ Class<?> targetClass, Schema schema) {
+ return JavaBeanUtils.getFieldTypes(targetClass, schema, new ProtoClassFieldValueTypeSupplier());
+ }
+
+ @Override
+ public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
+ SchemaUserTypeCreator creator =
+ ProtoByteBuddyUtils.getBuilderCreator(
+ targetClass, schema, new ProtoClassFieldValueTypeSupplier());
+ if (creator == null) {
+ throw new RuntimeException("Cannot create creator for " + targetClass);
+ }
+ return creator;
+ }
+
+ private <T> void checkForDynamicType(TypeDescriptor<T> typeDescriptor) {
+ if (typeDescriptor.getRawType().equals(DynamicMessage.class)) {
+ throw new RuntimeException(
+ "DynamicMessage is not allowed for the standard ProtoSchemaProvider, use ProtoDynamicMessageSchema instead.");
+ }
+ }
+}
diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
new file mode 100644
index 0000000..8952f9a
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.withFieldNumber;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMITIVE_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMITIVE_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMITIVE_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_BOOL;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_INT32;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_PRIMITIVE;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_STRING;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_ROW_BOOL;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_ROW_INT32;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_ROW_PRIMITIVE;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_ROW_STRING;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.OUTER_ONEOF_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.OUTER_ONEOF_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.OUTER_ONEOF_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.PRIMITIVE_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.PRIMITIVE_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.PRIMITIVE_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_SCHEMA;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.EnumMessage;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.EnumMessage.Enum;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.RepeatPrimitive;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.WktMessage;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ProtoMessageSchemaTest {
+
+ @Test
+ public void testPrimitiveSchema() {
+ Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(Primitive.class));
+ assertEquals(PRIMITIVE_SCHEMA, schema);
+ }
+
+ @Test
+ public void testPrimitiveProtoToRow() {
+ SerializableFunction<Primitive, Row> toRow =
+ new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(Primitive.class));
+ assertEquals(PRIMITIVE_ROW, toRow.apply(PRIMITIVE_PROTO));
+ }
+
+ @Test
+ public void testPrimitiveRowToProto() {
+ SerializableFunction<Row, Primitive> fromRow =
+ new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(Primitive.class));
+ assertEquals(PRIMITIVE_PROTO, fromRow.apply(PRIMITIVE_ROW));
+ }
+
+ @Test
+ public void testRepeatedSchema() {
+ Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(RepeatPrimitive.class));
+ assertEquals(REPEATED_SCHEMA, schema);
+ }
+
+ @Test
+ public void testRepeatedProtoToRow() {
+ SerializableFunction<RepeatPrimitive, Row> toRow =
+ new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(RepeatPrimitive.class));
+ assertEquals(REPEATED_ROW, toRow.apply(REPEATED_PROTO));
+ }
+
+ @Test
+ public void testRepeatedRowToProto() {
+ SerializableFunction<Row, RepeatPrimitive> fromRow =
+ new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(RepeatPrimitive.class));
+ assertEquals(REPEATED_PROTO, fromRow.apply(REPEATED_ROW));
+ }
+
+ // Test map type
+ @Test
+ public void testMapSchema() {
+ Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(MapPrimitive.class));
+ assertEquals(MAP_PRIMITIVE_SCHEMA, schema);
+ }
+
+ @Test
+ public void testMapProtoToRow() {
+ SerializableFunction<MapPrimitive, Row> toRow =
+ new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(MapPrimitive.class));
+ assertEquals(MAP_PRIMITIVE_ROW, toRow.apply(MAP_PRIMITIVE_PROTO));
+ }
+
+ @Test
+ public void testMapRowToProto() {
+ SerializableFunction<Row, MapPrimitive> fromRow =
+ new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(MapPrimitive.class));
+ assertEquals(MAP_PRIMITIVE_PROTO, fromRow.apply(MAP_PRIMITIVE_ROW));
+ }
+
+ @Test
+ public void testNestedSchema() {
+ Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(Nested.class));
+ assertEquals(NESTED_SCHEMA, schema);
+ }
+
+ @Test
+ public void testNestedProtoToRow() {
+ SerializableFunction<Nested, Row> toRow =
+ new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(Nested.class));
+ assertEquals(NESTED_ROW, toRow.apply(NESTED_PROTO));
+ }
+
+ @Test
+ public void testNestedRowToProto() {
+ SerializableFunction<Row, Nested> fromRow =
+ new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(Nested.class));
+ assertEquals(NESTED_PROTO, fromRow.apply(NESTED_ROW));
+ }
+
+ @Test
+ public void testOneOfSchema() {
+ Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(OneOf.class));
+ assertEquals(ONEOF_SCHEMA, schema);
+ }
+
+ @Test
+ public void testOneOfProtoToRow() {
+ SerializableFunction<OneOf, Row> toRow =
+ new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(OneOf.class));
+ assertEquals(ONEOF_ROW_INT32, toRow.apply(ONEOF_PROTO_INT32));
+ assertEquals(ONEOF_ROW_BOOL, toRow.apply(ONEOF_PROTO_BOOL));
+ assertEquals(ONEOF_ROW_STRING, toRow.apply(ONEOF_PROTO_STRING));
+ assertEquals(ONEOF_ROW_PRIMITIVE, toRow.apply(ONEOF_PROTO_PRIMITIVE));
+ }
+
+ @Test
+ public void testOneOfRowToProto() {
+ SerializableFunction<Row, OneOf> fromRow =
+ new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(OneOf.class));
+ assertEquals(ONEOF_PROTO_INT32, fromRow.apply(ONEOF_ROW_INT32));
+ assertEquals(ONEOF_PROTO_BOOL, fromRow.apply(ONEOF_ROW_BOOL));
+ assertEquals(ONEOF_PROTO_STRING, fromRow.apply(ONEOF_ROW_STRING));
+ assertEquals(ONEOF_PROTO_PRIMITIVE, fromRow.apply(ONEOF_ROW_PRIMITIVE));
+ }
+
+ @Test
+ public void testOuterOneOfSchema() {
+ Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(OuterOneOf.class));
+ assertEquals(OUTER_ONEOF_SCHEMA, schema);
+ }
+
+ @Test
+ public void testOuterOneOfProtoToRow() {
+ SerializableFunction<OuterOneOf, Row> toRow =
+ new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(OuterOneOf.class));
+ assertEquals(OUTER_ONEOF_ROW, toRow.apply(OUTER_ONEOF_PROTO));
+ }
+
+ @Test
+ public void testOuterOneOfRowToProto() {
+ SerializableFunction<Row, OuterOneOf> fromRow =
+ new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(OuterOneOf.class));
+ assertEquals(OUTER_ONEOF_PROTO, fromRow.apply(OUTER_ONEOF_ROW));
+ }
+
+ private static final EnumerationType ENUM_TYPE =
+ EnumerationType.create(ImmutableMap.of("ZERO", 0, "TWO", 2, "THREE", 3));
+ private static final Schema ENUM_SCHEMA =
+ Schema.builder()
+ .addField("enum", withFieldNumber(FieldType.logicalType(ENUM_TYPE).withNullable(true), 1))
+ .build();
+ private static final Row ENUM_ROW =
+ Row.withSchema(ENUM_SCHEMA).addValues(ENUM_TYPE.valueOf("TWO")).build();
+ private static final EnumMessage ENUM_PROTO = EnumMessage.newBuilder().setEnum(Enum.TWO).build();
+
+ @Test
+ public void testEnumSchema() {
+ Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(EnumMessage.class));
+ assertEquals(ENUM_SCHEMA, schema);
+ }
+
+ @Test
+ public void testEnumProtoToRow() {
+ SerializableFunction<EnumMessage, Row> toRow =
+ new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(EnumMessage.class));
+ assertEquals(ENUM_ROW, toRow.apply(ENUM_PROTO));
+ }
+
+ @Test
+ public void testEnumRowToProto() {
+ SerializableFunction<Row, EnumMessage> fromRow =
+ new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(EnumMessage.class));
+ assertEquals(ENUM_PROTO, fromRow.apply(ENUM_ROW));
+ }
+
+ @Test
+ public void testWktMessageSchema() {
+ Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(WktMessage.class));
+ assertEquals(WKT_MESSAGE_SCHEMA, schema);
+ }
+
+ @Test
+ public void testWktProtoToRow() {
+ SerializableFunction<WktMessage, Row> toRow =
+ new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(WktMessage.class));
+ assertEquals(WKT_MESSAGE_ROW, toRow.apply(WKT_MESSAGE_PROTO));
+ }
+
+ @Test
+ public void testWktRowToProto() {
+ SerializableFunction<Row, WktMessage> fromRow =
+ new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(WktMessage.class));
+ assertEquals(WKT_MESSAGE_PROTO, fromRow.apply(WKT_MESSAGE_ROW));
+ }
+}
diff --git a/sdks/java/extensions/protobuf/src/test/resources/README.md b/sdks/java/extensions/protobuf/src/test/resources/README.md
new file mode 100644
index 0000000..79083f5
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/test/resources/README.md
@@ -0,0 +1,34 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+This recreates the proto descriptor set included in this resource directory.
+
+```bash
+export PROTO_INCLUDE=<proto_include_dir>
+```
+Execute the following command to create the pb files, in the beam root folder:
+
+```bash
+protoc \
+ -Isdks/java/extensions/protobuf/src/test/resources/ \
+ -I$PROTO_INCLUDE \
+ --descriptor_set_out=sdks/java/extensions/protobuf/src/test/resources/org/apache/beam/sdk/extensions/protobuf/test_option_v1.pb \
+ --include_imports \
+ sdks/java/extensions/protobuf/src/test/resources/test/option/v1/simple.proto
+```
diff --git a/sdks/java/extensions/protobuf/src/test/resources/org/apache/beam/sdk/extensions/protobuf/test_option_v1.pb b/sdks/java/extensions/protobuf/src/test/resources/org/apache/beam/sdk/extensions/protobuf/test_option_v1.pb
new file mode 100644
index 0000000..4e97ad0
Binary files /dev/null and b/sdks/java/extensions/protobuf/src/test/resources/org/apache/beam/sdk/extensions/protobuf/test_option_v1.pb differ
diff --git a/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/option.proto b/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/option.proto
new file mode 100644
index 0000000..ca40119
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/option.proto
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package test.option.v1;
+
+import "google/protobuf/descriptor.proto";
+
+extend google.protobuf.FileOptions {
+ double fileoption_double = 66666700;
+ float fileoption_float = 66666701;
+ int32 fileoption_int32 = 66666702;
+ int64 fileoption_int64 = 66666703;
+ uint32 fileoption_uint32 = 66666704;
+ uint64 fileoption_uint64 = 66666705;
+ sint32 fileoption_sint32 = 66666706;
+ sint64 fileoption_sint64 = 66666707;
+ fixed32 fileoption_fixed32 = 66666708;
+ fixed64 fileoption_fixed64 = 66666709;
+ sfixed32 fileoption_sfixed32 = 66666710;
+ sfixed64 fileoption_sfixed64 = 66666711;
+ bool fileoption_bool = 66666712;
+ string fileoption_string = 66666713;
+ bytes fileoption_bytes = 66666714;
+ OptionMessage fileoption_message = 66666715;
+ OptionEnum fileoption_enum = 66666716;
+}
+
+extend google.protobuf.MessageOptions {
+ double messageoption_double = 66666700;
+ float messageoption_float = 66666701;
+ int32 messageoption_int32 = 66666702;
+ int64 messageoption_int64 = 66666703;
+ uint32 messageoption_uint32 = 66666704;
+ uint64 messageoption_uint64 = 66666705;
+ sint32 messageoption_sint32 = 66666706;
+ sint64 messageoption_sint64 = 66666707;
+ fixed32 messageoption_fixed32 = 66666708;
+ fixed64 messageoption_fixed64 = 66666709;
+ sfixed32 messageoption_sfixed32 = 66666710;
+ sfixed64 messageoption_sfixed64 = 66666711;
+ bool messageoption_bool = 66666712;
+ string messageoption_string = 66666713;
+ bytes messageoption_bytes = 66666714;
+ OptionMessage messageoption_message = 66666715;
+ OptionEnum messageoption_enum = 66666716;
+
+ repeated double messageoption_repeated_double = 66666800;
+ repeated float messageoption_repeated_float = 66666801;
+ repeated int32 messageoption_repeated_int32 = 66666802;
+ repeated int64 messageoption_repeated_int64 = 66666803;
+ repeated uint32 messageoption_repeated_uint32 = 66666804;
+ repeated uint64 messageoption_repeated_uint64 = 66666805;
+ repeated sint32 messageoption_repeated_sint32 = 66666806;
+ repeated sint64 messageoption_repeated_sint64 = 66666807;
+ repeated fixed32 messageoption_repeated_fixed32 = 66666808;
+ repeated fixed64 messageoption_repeated_fixed64 = 66666809;
+ repeated sfixed32 messageoption_repeated_sfixed32 = 66666810;
+ repeated sfixed64 messageoption_repeated_sfixed64 = 66666811;
+ repeated bool messageoption_repeated_bool = 66666812;
+ repeated string messageoption_repeated_string = 66666813;
+ repeated bytes messageoption_repeated_bytes = 66666814;
+ repeated OptionMessage messageoption_repeated_message = 66666815;
+ repeated OptionEnum messageoption_repeated_enum = 66666816;
+}
+
+extend google.protobuf.FieldOptions {
+ double fieldoption_double = 66666700;
+ float fieldoption_float = 66666701;
+ int32 fieldoption_int32 = 66666702;
+ int64 fieldoption_int64 = 66666703;
+ uint32 fieldoption_uint32 = 66666704;
+ uint64 fieldoption_uint64 = 66666705;
+ sint32 fieldoption_sint32 = 66666706;
+ sint64 fieldoption_sint64 = 66666707;
+ fixed32 fieldoption_fixed32 = 66666708;
+ fixed64 fieldoption_fixed64 = 66666709;
+ sfixed32 fieldoption_sfixed32 = 66666710;
+ sfixed64 fieldoption_sfixed64 = 66666711;
+ bool fieldoption_bool = 66666712;
+ string fieldoption_string = 66666713;
+ bytes fieldoption_bytes = 66666714;
+ OptionMessage fieldoption_message = 66666715;
+ OptionEnum fieldoption_enum = 66666716;
+
+ repeated double fieldoption_repeated_double = 66666800;
+ repeated float fieldoption_repeated_float = 66666801;
+ repeated int32 fieldoption_repeated_int32 = 66666802;
+ repeated int64 fieldoption_repeated_int64 = 66666803;
+ repeated uint32 fieldoption_repeated_uint32 = 66666804;
+ repeated uint64 fieldoption_repeated_uint64 = 66666805;
+ repeated sint32 fieldoption_repeated_sint32 = 66666806;
+ repeated sint64 fieldoption_repeated_sint64 = 66666807;
+ repeated fixed32 fieldoption_repeated_fixed32 = 66666808;
+ repeated fixed64 fieldoption_repeated_fixed64 = 66666809;
+ repeated sfixed32 fieldoption_repeated_sfixed32 = 66666810;
+ repeated sfixed64 fieldoption_repeated_sfixed64 = 66666811;
+ repeated bool fieldoption_repeated_bool = 66666812;
+ repeated string fieldoption_repeated_string = 66666813;
+ repeated bytes fieldoption_repeated_bytes = 66666814;
+ repeated OptionMessage fieldoption_repeated_message = 66666815;
+ repeated OptionEnum fieldoption_repeated_enum = 66666816;
+}
+
+enum OptionEnum {
+ DEFAULT = 0;
+ ENUM1 = 1;
+ ENUM2 = 2;
+}
+
+message OptionMessage {
+ string string = 1;
+ repeated string repeated_string = 2;
+
+ int32 int32 = 3;
+ repeated int32 repeated_int32 = 4;
+
+ int64 int64 = 5;
+
+ OptionEnum test_enum = 6;
+}
\ No newline at end of file
diff --git a/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/simple.proto b/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/simple.proto
new file mode 100644
index 0000000..1750ddf
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/simple.proto
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "test/option/v1/option.proto";
+
+package test.option.v1;
+
+message MessageWithOptions {
+ string test_name = 1;
+ int32 test_index = 2;
+
+ int32 field_with_fieldoption_double = 700 [(test.option.v1.fieldoption_double) = 100.1];
+ int32 field_with_fieldoption_float = 701 [(test.option.v1.fieldoption_float) = 101.2];
+ int32 field_with_fieldoption_int32 = 702 [(test.option.v1.fieldoption_int32) = 102];
+ int32 field_with_fieldoption_int64 = 703 [(test.option.v1.fieldoption_int64) = 103];
+ int32 field_with_fieldoption_uint32 = 704 [(test.option.v1.fieldoption_uint32) = 104];
+ int32 field_with_fieldoption_uint64 = 705 [(test.option.v1.fieldoption_uint64) = 105];
+ int32 field_with_fieldoption_sint32 = 706 [(test.option.v1.fieldoption_sint32) = 106];
+ int32 field_with_fieldoption_sint64 = 707 [(test.option.v1.fieldoption_sint64) = 107];
+ int32 field_with_fieldoption_fixed32 = 708;
+ int32 field_with_fieldoption_fixed64 = 709;
+ int32 field_with_fieldoption_sfixed32 = 710;
+ int32 field_with_fieldoption_sfixed64 = 711;
+ int32 field_with_fieldoption_bool = 712 [(test.option.v1.fieldoption_bool) = true];
+ int32 field_with_fieldoption_string = 713 [(test.option.v1.fieldoption_string) = "Oh yeah"];
+ int32 field_with_fieldoption_bytes = 714;
+ int32 field_with_fieldoption_message = 715;
+ int32 field_with_fieldoption_enum = 716 [(test.option.v1.fieldoption_enum) = ENUM1];
+
+ int32 field_with_fieldoption_repeated_double = 800;
+ int32 field_with_fieldoption_repeated_float = 801;
+ int32 field_with_fieldoption_repeated_int32 = 802;
+ int32 field_with_fieldoption_repeated_int64 = 803;
+ int32 field_with_fieldoption_repeated_uint32 = 804;
+ int32 field_with_fieldoption_repeated_uint64 = 805;
+ int32 field_with_fieldoption_repeated_sint32 = 806;
+ int32 field_with_fieldoption_repeated_sint64 = 807;
+ int32 field_with_fieldoption_repeated_fixed32 = 808;
+ int32 field_with_fieldoption_repeated_fixed64 = 809;
+ int32 field_with_fieldoption_repeated_sfixed32 = 810;
+ int32 field_with_fieldoption_repeated_sfixed64 = 811;
+ int32 field_with_fieldoption_repeated_bool = 812;
+ int32 field_with_fieldoption_repeated_string = 813 [(test.option.v1.fieldoption_repeated_string) = "Oh yeah",
+ (test.option.v1.fieldoption_repeated_string) = "Oh no"];
+ int32 field_with_fieldoption_repeated_bytes = 814;
+ int32 field_with_fieldoption_repeated_message = 815;
+ int32 field_with_fieldoption_repeated_enum = 816;
+
+}
+