You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2019/12/25 18:21:01 UTC

[beam] branch master updated: Merge pull request #10449: [BEAM-7274] Implement the Protobuf schema provider for compiled protocol buffers

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bb3c63b  Merge pull request #10449: [BEAM-7274] Implement the Protobuf schema provider for compiled protocol buffers
bb3c63b is described below

commit bb3c63b647956a3c0dd055a786aa252c30fdae50
Author: reuvenlax <re...@google.com>
AuthorDate: Wed Dec 25 08:20:50 2019 -1000

    Merge pull request #10449: [BEAM-7274] Implement the Protobuf schema provider for compiled protocol buffers
---
 .../apache/beam/sdk/schemas/FieldValueSetter.java  |   2 +-
 .../sdk/schemas/FieldValueTypeInformation.java     |  30 +-
 .../beam/sdk/schemas/FromRowUsingCreator.java      |  48 +-
 .../beam/sdk/schemas/transforms/Convert.java       |   2 +-
 .../beam/sdk/schemas/utils/AvroByteBuddyUtils.java |   2 +-
 .../beam/sdk/schemas/utils/ByteBuddyUtils.java     |   3 +-
 .../beam/sdk/schemas/utils/JavaBeanUtils.java      |  35 +-
 .../apache/beam/sdk/schemas/utils/POJOUtils.java   |  12 +-
 .../beam/sdk/schemas/utils/ReflectUtils.java       |  36 +-
 .../org/apache/beam/sdk/values/RowWithGetters.java |  10 +
 .../extensions/protobuf/ProtoByteBuddyUtils.java   | 663 +++++++++++++++++++++
 .../extensions/protobuf/ProtoMessageSchema.java    | 125 ++++
 .../protobuf/ProtoMessageSchemaTest.java           | 249 ++++++++
 .../protobuf/src/test/resources/README.md          |  34 ++
 .../beam/sdk/extensions/protobuf/test_option_v1.pb | Bin 0 -> 18745 bytes
 .../src/test/resources/test/option/v1/option.proto | 137 +++++
 .../src/test/resources/test/option/v1/simple.proto |  67 +++
 17 files changed, 1381 insertions(+), 74 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java
index 5d9e82b..db7caaa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.annotations.Internal;
  *
  * <p>An interface to set a field of a class.
  *
- * <p>Implementations of this interface are generated at runtime to map Row fields back into objet
+ * <p>Implementations of this interface are generated at runtime to map Row fields back into object
  * fields.
  */
 @Internal
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
index a6ecc45..33ed888 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
@@ -22,7 +22,10 @@ import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
 import org.apache.beam.sdk.schemas.utils.ReflectUtils;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -47,6 +50,8 @@ public abstract class FieldValueTypeInformation implements Serializable {
   @Nullable
   public abstract Method getMethod();
 
+  public abstract Map<String, FieldValueTypeInformation> getOneOfTypes();
+
   /** If the field is a container type, returns the element type. */
   @Nullable
   public abstract FieldValueTypeInformation getElementType();
@@ -62,7 +67,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
   abstract Builder toBuilder();
 
   @AutoValue.Builder
-  abstract static class Builder {
+  public abstract static class Builder {
     public abstract Builder setName(String name);
 
     public abstract Builder setNullable(boolean nullable);
@@ -75,6 +80,8 @@ public abstract class FieldValueTypeInformation implements Serializable {
 
     public abstract Builder setMethod(@Nullable Method method);
 
+    public abstract Builder setOneOfTypes(Map<String, FieldValueTypeInformation> oneOfTypes);
+
     public abstract Builder setElementType(@Nullable FieldValueTypeInformation elementType);
 
     public abstract Builder setMapKeyType(@Nullable FieldValueTypeInformation mapKeyType);
@@ -84,6 +91,22 @@ public abstract class FieldValueTypeInformation implements Serializable {
     abstract FieldValueTypeInformation build();
   }
 
+  public static FieldValueTypeInformation forOneOf(
+      String name, boolean nullable, Map<String, FieldValueTypeInformation> oneOfTypes) {
+    final TypeDescriptor<OneOfType.Value> typeDescriptor = TypeDescriptor.of(OneOfType.Value.class);
+    return new AutoValue_FieldValueTypeInformation.Builder()
+        .setName(name)
+        .setNullable(nullable)
+        .setType(typeDescriptor)
+        .setRawType(typeDescriptor.getRawType())
+        .setField(null)
+        .setElementType(null)
+        .setMapKeyType(null)
+        .setMapValueType(null)
+        .setOneOfTypes(oneOfTypes)
+        .build();
+  }
+
   public static FieldValueTypeInformation forField(Field field) {
     TypeDescriptor type = TypeDescriptor.of(field.getGenericType());
     return new AutoValue_FieldValueTypeInformation.Builder()
@@ -95,6 +118,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
         .setElementType(getIterableComponentType(field))
         .setMapKeyType(getMapKeyType(field))
         .setMapValueType(getMapValueType(field))
+        .setOneOfTypes(Collections.emptyMap())
         .build();
   }
 
@@ -119,6 +143,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
         .setElementType(getIterableComponentType(type))
         .setMapKeyType(getMapKeyType(type))
         .setMapValueType(getMapValueType(type))
+        .setOneOfTypes(Collections.emptyMap())
         .build();
   }
 
@@ -148,6 +173,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
         .setElementType(getIterableComponentType(type))
         .setMapKeyType(getMapKeyType(type))
         .setMapValueType(getMapValueType(type))
+        .setOneOfTypes(Collections.emptyMap())
         .build();
   }
 
@@ -175,6 +201,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
         .setElementType(getIterableComponentType(componentType))
         .setMapKeyType(getMapKeyType(componentType))
         .setMapValueType(getMapValueType(componentType))
+        .setOneOfTypes(Collections.emptyMap())
         .build();
   }
 
@@ -217,6 +244,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
         .setElementType(getIterableComponentType(mapType))
         .setMapKeyType(getMapKeyType(mapType))
         .setMapValueType(getMapValueType(mapType))
+        .setOneOfTypes(Collections.emptyMap())
         .build();
   }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
index 61c0d05..499991f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
@@ -28,6 +28,7 @@ import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.RowWithGetters;
@@ -80,13 +81,7 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
       FieldValueTypeInformation typeInformation = checkNotNull(typeInformations.get(i));
       params[i] =
           fromValue(
-              type,
-              row.getValue(i),
-              typeInformation.getRawType(),
-              typeInformation.getElementType(),
-              typeInformation.getMapKeyType(),
-              typeInformation.getMapValueType(),
-              typeFactory);
+              type, row.getValue(i), typeInformation.getRawType(), typeInformation, typeFactory);
     }
 
     SchemaUserTypeCreator creator = schemaTypeCreatorFactory.create(clazz, schema);
@@ -99,10 +94,11 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
       FieldType type,
       ValueT value,
       Type fieldType,
-      FieldValueTypeInformation elementType,
-      FieldValueTypeInformation keyType,
-      FieldValueTypeInformation valueType,
+      FieldValueTypeInformation fieldValueTypeInformation,
       Factory<List<FieldValueTypeInformation>> typeFactory) {
+    FieldValueTypeInformation elementType = fieldValueTypeInformation.getElementType();
+    FieldValueTypeInformation keyType = fieldValueTypeInformation.getMapKeyType();
+    FieldValueTypeInformation valueType = fieldValueTypeInformation.getMapValueType();
     if (value == null) {
       return null;
     }
@@ -127,6 +123,22 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
               valueType,
               typeFactory);
     } else {
+      if (type.getTypeName().isLogicalType()
+          && OneOfType.IDENTIFIER.equals(type.getLogicalType().getIdentifier())) {
+        OneOfType oneOfType = type.getLogicalType(OneOfType.class);
+        OneOfType.Value oneOfValue = oneOfType.toInputType((Row) value);
+        FieldValueTypeInformation oneOfFieldValueTypeInformation =
+            checkNotNull(
+                fieldValueTypeInformation.getOneOfTypes().get(oneOfValue.getCaseType().toString()));
+        Object fromValue =
+            fromValue(
+                oneOfValue.getFieldType(),
+                oneOfValue.getValue(),
+                oneOfFieldValueTypeInformation.getRawType(),
+                oneOfFieldValueTypeInformation,
+                typeFactory);
+        return (ValueT) oneOfType.createValue(oneOfValue.getCaseType(), fromValue);
+      }
       return value;
     }
   }
@@ -156,9 +168,7 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
                 elementType,
                 element,
                 elementTypeInformation.getType().getType(),
-                elementTypeInformation.getElementType(),
-                elementTypeInformation.getMapKeyType(),
-                elementTypeInformation.getMapValueType(),
+                elementTypeInformation,
                 typeFactory));
   }
 
@@ -175,9 +185,7 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
                 elementType,
                 element,
                 elementTypeInformation.getType().getType(),
-                elementTypeInformation.getElementType(),
-                elementTypeInformation.getMapKeyType(),
-                elementTypeInformation.getMapValueType(),
+                elementTypeInformation,
                 typeFactory));
   }
 
@@ -196,18 +204,14 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
               keyType,
               entry.getKey(),
               keyTypeInformation.getType().getType(),
-              keyTypeInformation.getElementType(),
-              keyTypeInformation.getMapKeyType(),
-              keyTypeInformation.getMapValueType(),
+              keyTypeInformation,
               typeFactory);
       Object value =
           fromValue(
               valueType,
               entry.getValue(),
               valueTypeInformation.getType().getType(),
-              valueTypeInformation.getElementType(),
-              valueTypeInformation.getMapKeyType(),
-              valueTypeInformation.getMapValueType(),
+              valueTypeInformation,
               typeFactory);
       newMap.put(key, value);
     }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
index 5176def..5233b0c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Convert.java
@@ -40,7 +40,7 @@ public class Convert {
    * Convert a {@link PCollection}{@literal <InputT>} into a {@link PCollection}{@literal <Row>}.
    *
    * <p>The input {@link PCollection} must have a schema attached. The output collection will have
-   * the same schema as the iput.
+   * the same schema as the input.
    */
   public static <InputT> PTransform<PCollection<InputT>, PCollection<Row>> toRows() {
     return to(Row.class);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
index fd7f601..436da6c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
@@ -54,7 +54,7 @@ class AvroByteBuddyUtils {
   static <T extends SpecificRecord> SchemaUserTypeCreator getCreator(
       Class<T> clazz, Schema schema) {
     return CACHED_CREATORS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema), c -> createCreator(clazz, schema));
+        ClassWithSchema.create(clazz, schema), c -> createCreator(clazz, schema));
   }
 
   private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, Schema schema) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
index 791dafb..c00d5d0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
@@ -94,7 +94,6 @@ public class ByteBuddyUtils {
       new ForLoadedType(ReadableInstant.class);
   private static final ForLoadedType READABLE_PARTIAL_TYPE =
       new ForLoadedType(ReadablePartial.class);
-  private static final ForLoadedType OBJECT_TYPE = new ForLoadedType(Object.class);
   private static final ForLoadedType INTEGER_TYPE = new ForLoadedType(Integer.class);
   private static final ForLoadedType ENUM_TYPE = new ForLoadedType(Enum.class);
   private static final ForLoadedType BYTE_BUDDY_UTILS_TYPE =
@@ -134,7 +133,7 @@ public class ByteBuddyUtils {
 
   // Create a new FieldValueGetter subclass.
   @SuppressWarnings("unchecked")
-  static DynamicType.Builder<FieldValueGetter> subclassGetterInterface(
+  public static DynamicType.Builder<FieldValueGetter> subclassGetterInterface(
       ByteBuddy byteBuddy, Type objectType, Type fieldType) {
     TypeDescription.Generic getterGenericType =
         TypeDescription.Generic.Builder.parameterizedType(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
index 759d77d..e25342b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -102,7 +102,7 @@ public class JavaBeanUtils {
   public static List<FieldValueTypeInformation> getFieldTypes(
       Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
     return CACHED_FIELD_TYPES.computeIfAbsent(
-        new ClassWithSchema(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema));
+        ClassWithSchema.create(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema));
   }
 
   // The list of getters for a class is cached, so we only create the classes the first time
@@ -121,7 +121,7 @@ public class JavaBeanUtils {
       FieldValueTypeSupplier fieldValueTypeSupplier,
       TypeConversionsFactory typeConversionsFactory) {
     return CACHED_GETTERS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
+        ClassWithSchema.create(clazz, schema),
         c -> {
           List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
           return types.stream()
@@ -130,7 +130,7 @@ public class JavaBeanUtils {
         });
   }
 
-  private static <T> FieldValueGetter createGetter(
+  public static <T> FieldValueGetter createGetter(
       FieldValueTypeInformation typeInformation, TypeConversionsFactory typeConversionsFactory) {
     DynamicType.Builder<FieldValueGetter> builder =
         ByteBuddyUtils.subclassGetterInterface(
@@ -184,7 +184,7 @@ public class JavaBeanUtils {
       FieldValueTypeSupplier fieldValueTypeSupplier,
       TypeConversionsFactory typeConversionsFactory) {
     return CACHED_SETTERS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
+        ClassWithSchema.create(clazz, schema),
         c -> {
           List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
           return types.stream()
@@ -193,14 +193,14 @@ public class JavaBeanUtils {
         });
   }
 
-  private static FieldValueSetter createSetter(
+  public static FieldValueSetter createSetter(
       FieldValueTypeInformation typeInformation, TypeConversionsFactory typeConversionsFactory) {
     DynamicType.Builder<FieldValueSetter> builder =
         ByteBuddyUtils.subclassSetterInterface(
             BYTE_BUDDY,
             typeInformation.getMethod().getDeclaringClass(),
             typeConversionsFactory.createTypeConversion(false).convert(typeInformation.getType()));
-    builder = implementSetterMethods(builder, typeInformation.getMethod(), typeConversionsFactory);
+    builder = implementSetterMethods(builder, typeInformation, typeConversionsFactory);
     try {
       return builder
           .make()
@@ -222,14 +222,13 @@ public class JavaBeanUtils {
 
   private static DynamicType.Builder<FieldValueSetter> implementSetterMethods(
       DynamicType.Builder<FieldValueSetter> builder,
-      Method method,
+      FieldValueTypeInformation fieldValueTypeInformation,
       TypeConversionsFactory typeConversionsFactory) {
-    FieldValueTypeInformation javaTypeInformation = FieldValueTypeInformation.forSetter(method);
     return builder
         .method(ElementMatchers.named("name"))
-        .intercept(FixedValue.reference(javaTypeInformation.getName()))
+        .intercept(FixedValue.reference(fieldValueTypeInformation.getName()))
         .method(ElementMatchers.named("set"))
-        .intercept(new InvokeSetterInstruction(method, typeConversionsFactory));
+        .intercept(new InvokeSetterInstruction(fieldValueTypeInformation, typeConversionsFactory));
   }
 
   // The list of constructors for a class is cached, so we only create the classes the first time
@@ -244,7 +243,7 @@ public class JavaBeanUtils {
       FieldValueTypeSupplier fieldValueTypeSupplier,
       TypeConversionsFactory typeConversionsFactory) {
     return CACHED_CREATORS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
+        ClassWithSchema.create(clazz, schema),
         c -> {
           List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
           return createConstructorCreator(
@@ -291,7 +290,7 @@ public class JavaBeanUtils {
       FieldValueTypeSupplier fieldValueTypeSupplier,
       TypeConversionsFactory typeConversionsFactory) {
     return CACHED_CREATORS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
+        ClassWithSchema.create(clazz, schema),
         c -> {
           List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
           return createStaticCreator(clazz, creator, schema, types, typeConversionsFactory);
@@ -377,11 +376,13 @@ public class JavaBeanUtils {
   // Implements a method to write a public set out on an object.
   private static class InvokeSetterInstruction implements Implementation {
     // Setter method that will be invoked
-    private Method method;
+    private FieldValueTypeInformation fieldValueTypeInformation;
     private final TypeConversionsFactory typeConversionsFactory;
 
-    InvokeSetterInstruction(Method method, TypeConversionsFactory typeConversionsFactory) {
-      this.method = method;
+    InvokeSetterInstruction(
+        FieldValueTypeInformation fieldValueTypeInformation,
+        TypeConversionsFactory typeConversionsFactory) {
+      this.fieldValueTypeInformation = fieldValueTypeInformation;
       this.typeConversionsFactory = typeConversionsFactory;
     }
 
@@ -393,13 +394,13 @@ public class JavaBeanUtils {
     @Override
     public ByteCodeAppender appender(final Target implementationTarget) {
       return (methodVisitor, implementationContext, instrumentedMethod) -> {
-        FieldValueTypeInformation javaTypeInformation = FieldValueTypeInformation.forSetter(method);
         // this + method parameters.
         int numLocals = 1 + instrumentedMethod.getParameters().size();
 
         // The instruction to read the field.
         StackManipulation readField = MethodVariableAccess.REFERENCE.loadFrom(2);
 
+        Method method = fieldValueTypeInformation.getMethod();
         boolean setterMethodReturnsVoid = method.getReturnType().equals(Void.TYPE);
         // Read the object onto the stack.
         StackManipulation stackManipulation =
@@ -409,7 +410,7 @@ public class JavaBeanUtils {
                 // Do any conversions necessary.
                 typeConversionsFactory
                     .createSetterConversions(readField)
-                    .convert(javaTypeInformation.getType()),
+                    .convert(fieldValueTypeInformation.getType()),
                 // Now update the field and return void.
                 MethodInvocation.invoke(new ForLoadedMethod(method)));
         if (!setterMethodReturnsVoid) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
index a58ddf8..aa968b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
@@ -81,7 +81,7 @@ public class POJOUtils {
   public static List<FieldValueTypeInformation> getFieldTypes(
       Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
     return CACHED_FIELD_TYPES.computeIfAbsent(
-        new ClassWithSchema(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema));
+        ClassWithSchema.create(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema));
   }
 
   // The list of getters for a class is cached, so we only create the classes the first time
@@ -96,7 +96,7 @@ public class POJOUtils {
       TypeConversionsFactory typeConversionsFactory) {
     // Return the getters ordered by their position in the schema.
     return CACHED_GETTERS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
+        ClassWithSchema.create(clazz, schema),
         c -> {
           List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
           List<FieldValueGetter> getters =
@@ -122,7 +122,7 @@ public class POJOUtils {
       FieldValueTypeSupplier fieldValueTypeSupplier,
       TypeConversionsFactory typeConversionsFactory) {
     return CACHED_CREATORS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
+        ClassWithSchema.create(clazz, schema),
         c -> {
           List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
           return createSetFieldCreator(clazz, schema, types, typeConversionsFactory);
@@ -169,7 +169,7 @@ public class POJOUtils {
       FieldValueTypeSupplier fieldValueTypeSupplier,
       TypeConversionsFactory typeConversionsFactory) {
     return CACHED_CREATORS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
+        ClassWithSchema.create(clazz, schema),
         c -> {
           List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
           return createConstructorCreator(
@@ -217,7 +217,7 @@ public class POJOUtils {
       FieldValueTypeSupplier fieldValueTypeSupplier,
       TypeConversionsFactory typeConversionsFactory) {
     return CACHED_CREATORS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
+        ClassWithSchema.create(clazz, schema),
         c -> {
           List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
           return createStaticCreator(clazz, creator, schema, types, typeConversionsFactory);
@@ -323,7 +323,7 @@ public class POJOUtils {
       TypeConversionsFactory typeConversionsFactory) {
     // Return the setters, ordered by their position in the schema.
     return CACHED_SETTERS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
+        ClassWithSchema.create(clazz, schema),
         c -> {
           List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
           return types.stream()
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
index d56f0bd..08c494c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.schemas.utils;
 
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
+import com.google.auto.value.AutoValue;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -31,7 +32,6 @@ import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.schemas.Schema;
@@ -39,35 +39,21 @@ import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimaps;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Primitives;
 
 /** A set of reflection helper methods. */
 public class ReflectUtils {
   /** Represents a class and a schema. */
-  public static class ClassWithSchema {
-    private final Class clazz;
-    private final Schema schema;
+  @AutoValue
+  public abstract static class ClassWithSchema {
+    public abstract Class getClazz();
 
-    public ClassWithSchema(Class clazz, Schema schema) {
-      this.clazz = clazz;
-      this.schema = schema;
-    }
+    public abstract Schema getSchema();
 
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      ClassWithSchema that = (ClassWithSchema) o;
-      return Objects.equals(clazz, that.clazz) && Objects.equals(schema, that.schema);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(clazz, schema);
+    public static ClassWithSchema create(Class clazz, Schema schema) {
+      return new AutoValue_ReflectUtils_ClassWithSchema(clazz, schema);
     }
   }
 
@@ -94,6 +80,10 @@ public class ReflectUtils {
         });
   }
 
+  public static Multimap<String, Method> getMethodsMap(Class clazz) {
+    return Multimaps.index(getMethods(clazz), Method::getName);
+  }
+
   @Nullable
   public static Constructor getAnnotatedConstructor(Class clazz) {
     return Arrays.stream(clazz.getDeclaredConstructors())
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
index ebf59b9..0d68709 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Collections2;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -122,6 +123,15 @@ public class RowWithGetters extends Row {
                   cacheKey, i -> getMapValue(type.getMapKeyType(), type.getMapValueType(), map))
           : (T) getMapValue(type.getMapKeyType(), type.getMapValueType(), map);
     } else {
+      if (type.isLogicalType(OneOfType.IDENTIFIER)) {
+        OneOfType oneOfType = type.getLogicalType(OneOfType.class);
+        OneOfType.Value oneOfValue = (OneOfType.Value) fieldValue;
+        Object convertedOneOfField =
+            getValue(oneOfValue.getFieldType(), oneOfValue.getValue(), null);
+        return (T)
+            oneOfType.toBaseType(
+                oneOfType.createValue(oneOfValue.getCaseType(), convertedOneOfField));
+      }
       return (T) fieldValue;
     }
   }
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteBuddyUtils.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteBuddyUtils.java
new file mode 100644
index 0000000..6827fb1
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteBuddyUtils.java
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.getFieldNumber;
+
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.Duration;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.ProtocolMessageEnum;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.UInt32Value;
+import com.google.protobuf.UInt64Value;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.protobuf.ProtoSchemaLogicalTypes.DurationNanos;
+import org.apache.beam.sdk.extensions.protobuf.ProtoSchemaLogicalTypes.TimestampNanos;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.FieldValueSetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType.Value;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForSetter;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.InjectPackageStrategy;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversion;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversionsFactory;
+import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
+import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.ByteBuddy;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.method.MethodDescription;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.type.TypeDescription;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.dynamic.DynamicType;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.Implementation;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.StackManipulation;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.assign.Assigner;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+
+public class ProtoByteBuddyUtils {
+  private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
+  private static final TypeDescriptor<ByteString> BYTE_STRING_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(ByteString.class);
+  private static final TypeDescriptor<Timestamp> PROTO_TIMESTAMP_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(Timestamp.class);
+  private static final TypeDescriptor<Duration> PROTO_DURATION_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(Duration.class);
+  private static final TypeDescriptor<Int32Value> PROTO_INT32_VALUE_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(Int32Value.class);
+  private static final TypeDescriptor<Int64Value> PROTO_INT64_VALUE_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(Int64Value.class);
+  private static final TypeDescriptor<UInt32Value> PROTO_UINT32_VALUE_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(UInt32Value.class);
+  private static final TypeDescriptor<UInt64Value> PROTO_UINT64_VALUE_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(UInt64Value.class);
+  private static final TypeDescriptor<FloatValue> PROTO_FLOAT_VALUE_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(FloatValue.class);
+  private static final TypeDescriptor<DoubleValue> PROTO_DOUBLE_VALUE_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(DoubleValue.class);
+  private static final TypeDescriptor<BoolValue> PROTO_BOOL_VALUE_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BoolValue.class);
+  private static final TypeDescriptor<StringValue> PROTO_STRING_VALUE_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(StringValue.class);
+  private static final TypeDescriptor<BytesValue> PROTO_BYTES_VALUE_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BytesValue.class);
+
+  private static final ForLoadedType BYTE_STRING_TYPE = new ForLoadedType(ByteString.class);
+  private static final ForLoadedType BYTE_ARRAY_TYPE = new ForLoadedType(byte[].class);
+  private static final ForLoadedType PROTO_ENUM_TYPE = new ForLoadedType(ProtocolMessageEnum.class);
+  private static final ForLoadedType INTEGER_TYPE = new ForLoadedType(Integer.class);
+
+  private static final Map<TypeDescriptor<?>, ForLoadedType> WRAPPER_LOADED_TYPES =
+      ImmutableMap.<TypeDescriptor<?>, ForLoadedType>builder()
+          .put(PROTO_INT32_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(Int32Value.class))
+          .put(PROTO_INT64_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(Int64Value.class))
+          .put(PROTO_UINT32_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(UInt32Value.class))
+          .put(PROTO_UINT64_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(UInt64Value.class))
+          .put(PROTO_FLOAT_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(FloatValue.class))
+          .put(PROTO_DOUBLE_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(DoubleValue.class))
+          .put(PROTO_BOOL_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(BoolValue.class))
+          .put(PROTO_STRING_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(StringValue.class))
+          .put(PROTO_BYTES_VALUE_TYPE_DESCRIPTOR, new ForLoadedType(BytesValue.class))
+          .build();
+
+  private static final Map<TypeName, String> PROTO_GETTER_SUFFIX =
+      ImmutableMap.of(
+          TypeName.ARRAY, "List",
+          TypeName.ITERABLE, "List",
+          TypeName.MAP, "Map");
+
+  private static final Map<TypeName, String> PROTO_SETTER_PREFIX =
+      ImmutableMap.of(
+          TypeName.ARRAY, "addAll",
+          TypeName.ITERABLE, "addAll",
+          TypeName.MAP, "putAll");
+  private static final String DEFAULT_PROTO_GETTER_PREFIX = "get";
+  private static final String DEFAULT_PROTO_SETTER_PREFIX = "set";
+
+  static String protoGetterName(String name, FieldType fieldType) {
+    final String camel = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, name);
+    return DEFAULT_PROTO_GETTER_PREFIX
+        + camel
+        + PROTO_GETTER_SUFFIX.getOrDefault(fieldType.getTypeName(), "");
+  }
+
+  static String protoSetterName(String name, FieldType fieldType) {
+    final String camel = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, name);
+    return protoSetterPrefix(fieldType) + camel;
+  }
+
+  static String protoSetterPrefix(FieldType fieldType) {
+    return PROTO_SETTER_PREFIX.getOrDefault(fieldType.getTypeName(), DEFAULT_PROTO_SETTER_PREFIX);
+  }
+
+  static class ProtoConvertType extends ConvertType {
+    ProtoConvertType(boolean returnRawValues) {
+      super(returnRawValues);
+    }
+
+    private static final Map<TypeDescriptor<?>, Class<?>> TYPE_OVERRIDES =
+        ImmutableMap.<TypeDescriptor<?>, Class<?>>builder()
+            .put(PROTO_TIMESTAMP_TYPE_DESCRIPTOR, Row.class)
+            .put(PROTO_DURATION_TYPE_DESCRIPTOR, Row.class)
+            .put(PROTO_INT32_VALUE_TYPE_DESCRIPTOR, Integer.class)
+            .put(PROTO_INT64_VALUE_TYPE_DESCRIPTOR, Long.class)
+            .put(PROTO_UINT32_VALUE_TYPE_DESCRIPTOR, Integer.class)
+            .put(PROTO_UINT64_VALUE_TYPE_DESCRIPTOR, Long.class)
+            .put(PROTO_FLOAT_VALUE_TYPE_DESCRIPTOR, Float.class)
+            .put(PROTO_DOUBLE_VALUE_TYPE_DESCRIPTOR, Double.class)
+            .put(PROTO_BOOL_VALUE_TYPE_DESCRIPTOR, Boolean.class)
+            .put(PROTO_STRING_VALUE_TYPE_DESCRIPTOR, String.class)
+            .put(PROTO_BYTES_VALUE_TYPE_DESCRIPTOR, byte[].class)
+            .build();
+
+    @Override
+    public Type convert(TypeDescriptor typeDescriptor) {
+      if (typeDescriptor.equals(BYTE_STRING_TYPE_DESCRIPTOR)
+          || typeDescriptor.isSubtypeOf(BYTE_STRING_TYPE_DESCRIPTOR)) {
+        return byte[].class;
+      } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ProtocolMessageEnum.class))) {
+        return Integer.class;
+      } else if (typeDescriptor.equals(PROTO_TIMESTAMP_TYPE_DESCRIPTOR)
+          || typeDescriptor.equals(PROTO_DURATION_TYPE_DESCRIPTOR)) {
+        return Row.class;
+      } else {
+        Type type = TYPE_OVERRIDES.get(typeDescriptor);
+        return (type != null) ? type : super.convert(typeDescriptor);
+      }
+    }
+  }
+
+  static class ProtoConvertValueForGetter extends ConvertValueForGetter {
+    ProtoConvertValueForGetter(StackManipulation readValue) {
+      super(readValue);
+    }
+
+    @Override
+    protected ProtoTypeConversionsFactory getFactory() {
+      return new ProtoTypeConversionsFactory();
+    }
+
+    @Override
+    public StackManipulation convert(TypeDescriptor type) {
+      if (type.equals(BYTE_STRING_TYPE_DESCRIPTOR)
+          || type.isSubtypeOf(BYTE_STRING_TYPE_DESCRIPTOR)) {
+        return new Compound(
+            readValue,
+            MethodInvocation.invoke(
+                BYTE_STRING_TYPE
+                    .getDeclaredMethods()
+                    .filter(ElementMatchers.named("toByteArray"))
+                    .getOnly()));
+      } else if (type.isSubtypeOf(TypeDescriptor.of(ProtocolMessageEnum.class))) {
+        return new Compound(
+            readValue,
+            MethodInvocation.invoke(
+                PROTO_ENUM_TYPE
+                    .getDeclaredMethods()
+                    .filter(
+                        ElementMatchers.named("getNumber").and(ElementMatchers.takesArguments(0)))
+                    .getOnly()),
+            Assigner.DEFAULT.assign(
+                INTEGER_TYPE.asUnboxed().asGenericType(),
+                INTEGER_TYPE.asGenericType(),
+                Typing.STATIC));
+      } else if (type.equals(PROTO_TIMESTAMP_TYPE_DESCRIPTOR)) {
+        return new Compound(
+            readValue,
+            MethodInvocation.invoke(
+                new ForLoadedType(TimestampNanos.class)
+                    .getDeclaredMethods()
+                    .filter(ElementMatchers.named("toRow"))
+                    .getOnly()));
+      } else if (type.equals(PROTO_DURATION_TYPE_DESCRIPTOR)) {
+        return new Compound(
+            readValue,
+            MethodInvocation.invoke(
+                new ForLoadedType(DurationNanos.class)
+                    .getDeclaredMethods()
+                    .filter(ElementMatchers.named("toRow"))
+                    .getOnly()));
+      } else {
+        ForLoadedType wrapperType = WRAPPER_LOADED_TYPES.get(type);
+        if (wrapperType != null) {
+          MethodDescription.InDefinedShape getValueMethod =
+              wrapperType.getDeclaredMethods().filter(ElementMatchers.named("getValue")).getOnly();
+          TypeDescription.Generic returnType = getValueMethod.getReturnType();
+          StackManipulation stackManipulation =
+              new Compound(
+                  readValue,
+                  MethodInvocation.invoke(getValueMethod),
+                  Assigner.DEFAULT.assign(
+                      returnType, returnType.asErasure().asBoxed().asGenericType(), Typing.STATIC));
+          if (type.equals(PROTO_BYTES_VALUE_TYPE_DESCRIPTOR)) {
+            stackManipulation =
+                getFactory()
+                    .createGetterConversions(stackManipulation)
+                    .convert(BYTE_STRING_TYPE_DESCRIPTOR);
+          }
+          return stackManipulation;
+        }
+        return super.convert(type);
+      }
+    }
+  }
+
+  static class ProtoConvertValueForSetter extends ConvertValueForSetter {
+    ProtoConvertValueForSetter(StackManipulation readValue) {
+      super(readValue);
+    }
+
+    @Override
+    protected ProtoTypeConversionsFactory getFactory() {
+      return new ProtoTypeConversionsFactory();
+    }
+
+    @Override
+    public StackManipulation convert(TypeDescriptor type) {
+      if (type.isSubtypeOf(TypeDescriptor.of(ByteString.class))) {
+        return new Compound(
+            readValue,
+            TypeCasting.to(BYTE_ARRAY_TYPE),
+            MethodInvocation.invoke(
+                BYTE_STRING_TYPE
+                    .getDeclaredMethods()
+                    .filter(
+                        ElementMatchers.named("copyFrom")
+                            .and(ElementMatchers.takesArguments(BYTE_ARRAY_TYPE)))
+                    .getOnly()));
+      } else if (type.isSubtypeOf(TypeDescriptor.of(ProtocolMessageEnum.class))) {
+        ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+        // Convert the stored number back to the enum constant.
+        return new Compound(
+            readValue,
+            Assigner.DEFAULT.assign(
+                INTEGER_TYPE.asBoxed().asGenericType(),
+                INTEGER_TYPE.asUnboxed().asGenericType(),
+                Typing.STATIC),
+            MethodInvocation.invoke(
+                loadedType
+                    .getDeclaredMethods()
+                    .filter(
+                        ElementMatchers.named("forNumber")
+                            .and(ElementMatchers.isStatic().and(ElementMatchers.takesArguments(1))))
+                    .getOnly()));
+      } else if (type.equals(PROTO_TIMESTAMP_TYPE_DESCRIPTOR)) {
+        return new Compound(
+            readValue,
+            MethodInvocation.invoke(
+                new ForLoadedType(TimestampNanos.class)
+                    .getDeclaredMethods()
+                    .filter(ElementMatchers.named("toTimestamp"))
+                    .getOnly()));
+      } else if (type.equals(PROTO_DURATION_TYPE_DESCRIPTOR)) {
+        return new Compound(
+            readValue,
+            MethodInvocation.invoke(
+                new ForLoadedType(DurationNanos.class)
+                    .getDeclaredMethods()
+                    .filter(ElementMatchers.named("toDuration"))
+                    .getOnly()));
+      } else {
+        ForLoadedType wrapperType = WRAPPER_LOADED_TYPES.get(type);
+        if (wrapperType != null) {
+          if (type.equals(PROTO_BYTES_VALUE_TYPE_DESCRIPTOR)) {
+            readValue =
+                getFactory()
+                    .createSetterConversions(readValue)
+                    .convert(TypeDescriptor.of(ByteString.class));
+          }
+          MethodDescription.InDefinedShape ofMethod =
+              wrapperType.getDeclaredMethods().filter(ElementMatchers.named("of")).getOnly();
+          TypeDescription.Generic argumentType = ofMethod.getParameters().get(0).getType();
+          return new Compound(
+              readValue,
+              Assigner.DEFAULT.assign(
+                  argumentType.asErasure().asBoxed().asGenericType(), argumentType, Typing.STATIC),
+              MethodInvocation.invoke(ofMethod));
+        } else {
+          return super.convert(type);
+        }
+      }
+    }
+  }
+
+  static class ProtoTypeConversionsFactory implements TypeConversionsFactory {
+    @Override
+    public TypeConversion<Type> createTypeConversion(boolean returnRawTypes) {
+      return new ProtoConvertType(returnRawTypes);
+    }
+
+    @Override
+    public TypeConversion<StackManipulation> createGetterConversions(StackManipulation readValue) {
+      return new ProtoConvertValueForGetter(readValue);
+    }
+
+    @Override
+    public TypeConversion<StackManipulation> createSetterConversions(StackManipulation readValue) {
+      return new ProtoConvertValueForSetter(readValue);
+    }
+  }
+
+  // The list of getters for a class is cached, so we only create the classes the first time
+  // getSetters is called.
+  private static final Map<ClassWithSchema, List<FieldValueGetter>> CACHED_GETTERS =
+      Maps.newConcurrentMap();
+
+  /**
+   * Return the list of {@link FieldValueGetter}s for a Java Bean class
+   *
+   * <p>The returned list is ordered by the order of fields in the schema.
+   */
+  public static List<FieldValueGetter> getGetters(
+      Class<?> clazz,
+      Schema schema,
+      FieldValueTypeSupplier fieldValueTypeSupplier,
+      TypeConversionsFactory typeConversionsFactory) {
+    Multimap<String, Method> methods = ReflectUtils.getMethodsMap(clazz);
+    return CACHED_GETTERS.computeIfAbsent(
+        ClassWithSchema.create(clazz, schema),
+        c -> {
+          List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
+          return types.stream()
+              .map(
+                  t ->
+                      createGetter(
+                          t,
+                          typeConversionsFactory,
+                          clazz,
+                          methods,
+                          schema.getField(t.getName()),
+                          fieldValueTypeSupplier))
+              .collect(Collectors.toList());
+        });
+  }
+
+  static class OneOfFieldValueGetter<ProtoT extends MessageLite>
+      implements FieldValueGetter<ProtoT, OneOfType.Value> {
+    private final String name;
+    private final Supplier<Method> getCaseMethod;
+    private final Map<Integer, FieldValueGetter<ProtoT, ?>> getterMethodMap;
+    private final OneOfType oneOfType;
+
+    public OneOfFieldValueGetter(
+        String name,
+        Supplier<Method> getCaseMethod,
+        Map<Integer, FieldValueGetter<ProtoT, ?>> getterMethodMap,
+        OneOfType oneOfType) {
+      this.name = name;
+      this.getCaseMethod = getCaseMethod;
+      this.getterMethodMap = getterMethodMap;
+      this.oneOfType = oneOfType;
+    }
+
+    @Nullable
+    @Override
+    public Value get(ProtoT object) {
+      try {
+        EnumLite caseValue = (EnumLite) getCaseMethod.get().invoke(object);
+        if (caseValue.getNumber() == 0) {
+          return null;
+        } else {
+          Object value = getterMethodMap.get(caseValue.getNumber()).get(object);
+          return oneOfType.createValue(
+              oneOfType.getCaseEnumType().valueOf(caseValue.getNumber()), value);
+        }
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public String name() {
+      return name;
+    }
+  }
+
+  private static FieldValueGetter createGetter(
+      FieldValueTypeInformation fieldValueTypeInformation,
+      TypeConversionsFactory typeConversionsFactory,
+      Class clazz,
+      Multimap<String, Method> methods,
+      Field field,
+      FieldValueTypeSupplier fieldValueTypeSupplier) {
+    if (field.getType().isLogicalType(OneOfType.IDENTIFIER)) {
+      OneOfType oneOfType = field.getType().getLogicalType(OneOfType.class);
+
+      // The case accessor method in the proto is named getOneOfNameCase.
+      Method caseMethod =
+          getProtoGetter(
+              methods,
+              field.getName() + "_case",
+              FieldType.logicalType(oneOfType.getCaseEnumType()));
+      Map<Integer, FieldValueGetter> oneOfGetters = Maps.newHashMap();
+      Map<String, FieldValueTypeInformation> oneOfFieldTypes =
+          fieldValueTypeSupplier.get(clazz, oneOfType.getOneOfSchema()).stream()
+              .collect(Collectors.toMap(FieldValueTypeInformation::getName, f -> f));
+      for (Field oneOfField : oneOfType.getOneOfSchema().getFields()) {
+        int protoFieldIndex = getFieldNumber(oneOfField.getType());
+        FieldValueGetter oneOfFieldGetter =
+            createGetter(
+                oneOfFieldTypes.get(oneOfField.getName()),
+                typeConversionsFactory,
+                clazz,
+                methods,
+                oneOfField,
+                fieldValueTypeSupplier);
+        oneOfGetters.put(protoFieldIndex, oneOfFieldGetter);
+      }
+      return new OneOfFieldValueGetter(
+          field.getName(),
+          (Supplier<Method> & Serializable) () -> caseMethod,
+          oneOfGetters,
+          oneOfType);
+    } else {
+      return JavaBeanUtils.createGetter(fieldValueTypeInformation, typeConversionsFactory);
+    }
+  }
+
+  private static Class getProtoGeneratedBuilder(Class<?> clazz) {
+    String builderClassName = clazz.getName() + "$Builder";
+    try {
+      return Class.forName(builderClassName);
+    } catch (ClassNotFoundException e) {
+      return null;
+    }
+  }
+
+  static Method getProtoSetter(Multimap<String, Method> methods, String name, FieldType fieldType) {
+    final TypeDescriptor<MessageLite.Builder> builderDescriptor =
+        TypeDescriptor.of(MessageLite.Builder.class);
+    return methods.get(protoSetterName(name, fieldType)).stream()
+        // Setter methods take only a single parameter.
+        .filter(m -> m.getParameterCount() == 1)
+        // For nested types, we don't use the version that takes a builder.
+        .filter(
+            m -> !TypeDescriptor.of(m.getGenericParameterTypes()[0]).isSubtypeOf(builderDescriptor))
+        .findAny()
+        .orElseThrow(IllegalArgumentException::new);
+  }
+
+  static Method getProtoGetter(Multimap<String, Method> methods, String name, FieldType fieldType) {
+    return methods.get(protoGetterName(name, fieldType)).stream()
+        .filter(m -> m.getParameterCount() == 0)
+        .findAny()
+        .orElseThrow(IllegalArgumentException::new);
+  }
+
+  @Nullable
+  public static SchemaUserTypeCreator getBuilderCreator(
+      Class<?> protoClass, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
+    Class<?> builderClass = getProtoGeneratedBuilder(protoClass);
+    if (builderClass == null) {
+      return null;
+    }
+
+    List<FieldValueSetter> setters = Lists.newArrayListWithCapacity(schema.getFieldCount());
+    Multimap<String, Method> methods = ReflectUtils.getMethodsMap(builderClass);
+    for (Field field : schema.getFields()) {
+      if (field.getType().isLogicalType(OneOfType.IDENTIFIER)) {
+        OneOfType oneOfType = field.getType().getLogicalType(OneOfType.class);
+        Map<Integer, Method> oneOfMethods = Maps.newHashMap();
+        for (Field oneOfField : oneOfType.getOneOfSchema().getFields()) {
+          Method method = getProtoSetter(methods, oneOfField.getName(), oneOfField.getType());
+          oneOfMethods.put(getFieldNumber(oneOfField.getType()), method);
+        }
+        setters.add(
+            new ProtoOneOfSetter(
+                (Function<Integer, Method> & Serializable) oneOfMethods::get, field.getName()));
+      } else {
+        Method method = getProtoSetter(methods, field.getName(), field.getType());
+        setters.add(
+            JavaBeanUtils.createSetter(
+                FieldValueTypeInformation.forSetter(method, protoSetterPrefix(field.getType())),
+                new ProtoTypeConversionsFactory()));
+      }
+    }
+    return createBuilderCreator(protoClass, builderClass, setters, schema);
+  }
+
+  static class ProtoOneOfSetter<BuilderT extends MessageLite.Builder>
+      implements FieldValueSetter<BuilderT, OneOfType.Value> {
+    private final Function<Integer, Method> methods;
+    private final String name;
+
+    ProtoOneOfSetter(Function<Integer, Method> methods, String name) {
+      this.methods = methods;
+      this.name = name;
+    }
+
+    @Override
+    public void set(BuilderT builder, OneOfType.Value oneOfValue) {
+      Method method = methods.apply(oneOfValue.getCaseType().getValue());
+      try {
+        method.invoke(builder, oneOfValue.getValue());
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public String name() {
+      return name;
+    }
+  }
+
+  static SchemaUserTypeCreator createBuilderCreator(
+      Class<?> protoClass, Class<?> builderClass, List<FieldValueSetter> setters, Schema schema) {
+    try {
+      DynamicType.Builder<Supplier> builder =
+          BYTE_BUDDY
+              .with(new InjectPackageStrategy(builderClass))
+              .subclass(Supplier.class)
+              .method(ElementMatchers.named("get"))
+              .intercept(new BuilderSupplier(protoClass));
+      Supplier supplier =
+          builder
+              .make()
+              .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+              .getLoaded()
+              .getDeclaredConstructor()
+              .newInstance();
+      return new ProtoCreatorFactory(supplier, setters);
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new RuntimeException(
+          "Unable to generate a creator for class " + builderClass + " with schema " + schema);
+    }
+  }
+
+  static class ProtoCreatorFactory implements SchemaUserTypeCreator {
+    private final Supplier<? extends MessageLite.Builder> builderCreator;
+    private final List<FieldValueSetter> setters;
+
+    public ProtoCreatorFactory(
+        Supplier<? extends MessageLite.Builder> builderCreator, List<FieldValueSetter> setters) {
+      this.builderCreator = builderCreator;
+      this.setters = setters;
+    }
+
+    @Override
+    public Object create(Object... params) {
+      MessageLite.Builder builder = builderCreator.get();
+      for (int i = 0; i < params.length; ++i) {
+        setters.get(i).set(builder, params[i]);
+      }
+      return builder.build();
+    }
+  }
+
+  static class BuilderSupplier implements Implementation {
+    private final Class<?> protoClass;
+
+    public BuilderSupplier(Class<?> protoClass) {
+      this.protoClass = protoClass;
+    }
+
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      ForLoadedType loadedProto = new ForLoadedType(protoClass);
+      return (methodVisitor, implementationContext, instrumentedMethod) -> {
+        // this + method parameters.
+        int numLocals = 1 + instrumentedMethod.getParameters().size();
+
+        // Create the builder object by calling ProtoClass.newBuilder().
+        StackManipulation stackManipulation =
+            new StackManipulation.Compound(
+                MethodInvocation.invoke(
+                    loadedProto
+                        .getDeclaredMethods()
+                        .filter(
+                            ElementMatchers.named("newBuilder")
+                                .and(ElementMatchers.takesArguments(0)))
+                        .getOnly()),
+                MethodReturn.REFERENCE);
+        StackManipulation.Size size = stackManipulation.apply(methodVisitor, implementationContext);
+        return new ByteCodeAppender.Size(size.getMaximalSize(), numLocals);
+      };
+    }
+  }
+}
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java
new file mode 100644
index 0000000..47a928c
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static org.apache.beam.sdk.extensions.protobuf.ProtoByteBuddyUtils.getProtoGetter;
+
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.extensions.protobuf.ProtoByteBuddyUtils.ProtoTypeConversionsFactory;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
+import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
+
+@Experimental(Kind.SCHEMAS)
+public class ProtoMessageSchema extends GetterBasedSchemaProvider {
+
+  private static final class ProtoClassFieldValueTypeSupplier implements FieldValueTypeSupplier {
+    @Override
+    public List<FieldValueTypeInformation> get(Class<?> clazz) {
+      throw new RuntimeException("Unexpected call.");
+    }
+
+    @Override
+    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+      Multimap<String, Method> methods = ReflectUtils.getMethodsMap(clazz);
+      List<FieldValueTypeInformation> types =
+          Lists.newArrayListWithCapacity(schema.getFieldCount());
+      for (Field field : schema.getFields()) {
+        if (field.getType().isLogicalType(OneOfType.IDENTIFIER)) {
+          // This is a OneOf. Look for the getters for each OneOf option.
+          OneOfType oneOfType = field.getType().getLogicalType(OneOfType.class);
+          Map<String, FieldValueTypeInformation> oneOfTypes = Maps.newHashMap();
+          for (Field oneOfField : oneOfType.getOneOfSchema().getFields()) {
+            Method method = getProtoGetter(methods, oneOfField.getName(), oneOfField.getType());
+            oneOfTypes.put(
+                oneOfField.getName(),
+                FieldValueTypeInformation.forGetter(method).withName(field.getName()));
+          }
+          // Add an entry that encapsulates information about all possible getters.
+          types.add(
+              FieldValueTypeInformation.forOneOf(
+                      field.getName(), field.getType().getNullable(), oneOfTypes)
+                  .withName(field.getName()));
+        } else {
+          // This is a simple field. Add the getter.
+          Method method = getProtoGetter(methods, field.getName(), field.getType());
+          types.add(FieldValueTypeInformation.forGetter(method).withName(field.getName()));
+        }
+      }
+      return types;
+    }
+  }
+
+  @Nullable
+  @Override
+  public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
+    checkForDynamicType(typeDescriptor);
+    return ProtoSchemaTranslator.getSchema((Class<Message>) typeDescriptor.getRawType());
+  }
+
+  @Override
+  public List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema) {
+    return ProtoByteBuddyUtils.getGetters(
+        targetClass,
+        schema,
+        new ProtoClassFieldValueTypeSupplier(),
+        new ProtoTypeConversionsFactory());
+  }
+
+  @Override
+  public List<FieldValueTypeInformation> fieldValueTypeInformations(
+      Class<?> targetClass, Schema schema) {
+    return JavaBeanUtils.getFieldTypes(targetClass, schema, new ProtoClassFieldValueTypeSupplier());
+  }
+
+  @Override
+  public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
+    SchemaUserTypeCreator creator =
+        ProtoByteBuddyUtils.getBuilderCreator(
+            targetClass, schema, new ProtoClassFieldValueTypeSupplier());
+    if (creator == null) {
+      throw new RuntimeException("Cannot create creator for " + targetClass);
+    }
+    return creator;
+  }
+
+  private <T> void checkForDynamicType(TypeDescriptor<T> typeDescriptor) {
+    if (typeDescriptor.getRawType().equals(DynamicMessage.class)) {
+      throw new RuntimeException(
+          "DynamicMessage is not allowed for the standard ProtoSchemaProvider, use ProtoDynamicMessageSchema  instead.");
+    }
+  }
+}
diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
new file mode 100644
index 0000000..8952f9a
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchemaTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.withFieldNumber;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMITIVE_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMITIVE_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.MAP_PRIMITIVE_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_BOOL;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_INT32;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_PRIMITIVE;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_STRING;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_ROW_BOOL;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_ROW_INT32;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_ROW_PRIMITIVE;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_ROW_STRING;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.OUTER_ONEOF_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.OUTER_ONEOF_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.OUTER_ONEOF_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.PRIMITIVE_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.PRIMITIVE_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.PRIMITIVE_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.REPEATED_SCHEMA;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_PROTO;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_ROW;
+import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.WKT_MESSAGE_SCHEMA;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.EnumMessage;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.EnumMessage.Enum;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.MapPrimitive;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Nested;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OneOf;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.OuterOneOf;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.Primitive;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.RepeatPrimitive;
+import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages.WktMessage;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ProtoMessageSchemaTest {
+
+  @Test
+  public void testPrimitiveSchema() {
+    Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(Primitive.class));
+    assertEquals(PRIMITIVE_SCHEMA, schema);
+  }
+
+  @Test
+  public void testPrimitiveProtoToRow() {
+    SerializableFunction<Primitive, Row> toRow =
+        new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(Primitive.class));
+    assertEquals(PRIMITIVE_ROW, toRow.apply(PRIMITIVE_PROTO));
+  }
+
+  @Test
+  public void testPrimitiveRowToProto() {
+    SerializableFunction<Row, Primitive> fromRow =
+        new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(Primitive.class));
+    assertEquals(PRIMITIVE_PROTO, fromRow.apply(PRIMITIVE_ROW));
+  }
+
+  @Test
+  public void testRepeatedSchema() {
+    Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(RepeatPrimitive.class));
+    assertEquals(REPEATED_SCHEMA, schema);
+  }
+
+  @Test
+  public void testRepeatedProtoToRow() {
+    SerializableFunction<RepeatPrimitive, Row> toRow =
+        new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(RepeatPrimitive.class));
+    assertEquals(REPEATED_ROW, toRow.apply(REPEATED_PROTO));
+  }
+
+  @Test
+  public void testRepeatedRowToProto() {
+    SerializableFunction<Row, RepeatPrimitive> fromRow =
+        new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(RepeatPrimitive.class));
+    assertEquals(REPEATED_PROTO, fromRow.apply(REPEATED_ROW));
+  }
+
+  // Test map type
+  @Test
+  public void testMapSchema() {
+    Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(MapPrimitive.class));
+    assertEquals(MAP_PRIMITIVE_SCHEMA, schema);
+  }
+
+  @Test
+  public void testMapProtoToRow() {
+    SerializableFunction<MapPrimitive, Row> toRow =
+        new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(MapPrimitive.class));
+    assertEquals(MAP_PRIMITIVE_ROW, toRow.apply(MAP_PRIMITIVE_PROTO));
+  }
+
+  @Test
+  public void testMapRowToProto() {
+    SerializableFunction<Row, MapPrimitive> fromRow =
+        new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(MapPrimitive.class));
+    assertEquals(MAP_PRIMITIVE_PROTO, fromRow.apply(MAP_PRIMITIVE_ROW));
+  }
+
+  @Test
+  public void testNestedSchema() {
+    Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(Nested.class));
+    assertEquals(NESTED_SCHEMA, schema);
+  }
+
+  @Test
+  public void testNestedProtoToRow() {
+    SerializableFunction<Nested, Row> toRow =
+        new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(Nested.class));
+    assertEquals(NESTED_ROW, toRow.apply(NESTED_PROTO));
+  }
+
+  @Test
+  public void testNestedRowToProto() {
+    SerializableFunction<Row, Nested> fromRow =
+        new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(Nested.class));
+    assertEquals(NESTED_PROTO, fromRow.apply(NESTED_ROW));
+  }
+
+  @Test
+  public void testOneOfSchema() {
+    Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(OneOf.class));
+    assertEquals(ONEOF_SCHEMA, schema);
+  }
+
+  @Test
+  public void testOneOfProtoToRow() {
+    SerializableFunction<OneOf, Row> toRow =
+        new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(OneOf.class));
+    assertEquals(ONEOF_ROW_INT32, toRow.apply(ONEOF_PROTO_INT32));
+    assertEquals(ONEOF_ROW_BOOL, toRow.apply(ONEOF_PROTO_BOOL));
+    assertEquals(ONEOF_ROW_STRING, toRow.apply(ONEOF_PROTO_STRING));
+    assertEquals(ONEOF_ROW_PRIMITIVE, toRow.apply(ONEOF_PROTO_PRIMITIVE));
+  }
+
+  @Test
+  public void testOneOfRowToProto() {
+    SerializableFunction<Row, OneOf> fromRow =
+        new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(OneOf.class));
+    assertEquals(ONEOF_PROTO_INT32, fromRow.apply(ONEOF_ROW_INT32));
+    assertEquals(ONEOF_PROTO_BOOL, fromRow.apply(ONEOF_ROW_BOOL));
+    assertEquals(ONEOF_PROTO_STRING, fromRow.apply(ONEOF_ROW_STRING));
+    assertEquals(ONEOF_PROTO_PRIMITIVE, fromRow.apply(ONEOF_ROW_PRIMITIVE));
+  }
+
+  @Test
+  public void testOuterOneOfSchema() {
+    Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(OuterOneOf.class));
+    assertEquals(OUTER_ONEOF_SCHEMA, schema);
+  }
+
+  @Test
+  public void testOuterOneOfProtoToRow() {
+    SerializableFunction<OuterOneOf, Row> toRow =
+        new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(OuterOneOf.class));
+    assertEquals(OUTER_ONEOF_ROW, toRow.apply(OUTER_ONEOF_PROTO));
+  }
+
+  @Test
+  public void testOuterOneOfRowToProto() {
+    SerializableFunction<Row, OuterOneOf> fromRow =
+        new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(OuterOneOf.class));
+    assertEquals(OUTER_ONEOF_PROTO, fromRow.apply(OUTER_ONEOF_ROW));
+  }
+
+  private static final EnumerationType ENUM_TYPE =
+      EnumerationType.create(ImmutableMap.of("ZERO", 0, "TWO", 2, "THREE", 3));
+  private static final Schema ENUM_SCHEMA =
+      Schema.builder()
+          .addField("enum", withFieldNumber(FieldType.logicalType(ENUM_TYPE).withNullable(true), 1))
+          .build();
+  private static final Row ENUM_ROW =
+      Row.withSchema(ENUM_SCHEMA).addValues(ENUM_TYPE.valueOf("TWO")).build();
+  private static final EnumMessage ENUM_PROTO = EnumMessage.newBuilder().setEnum(Enum.TWO).build();
+
+  @Test
+  public void testEnumSchema() {
+    Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(EnumMessage.class));
+    assertEquals(ENUM_SCHEMA, schema);
+  }
+
+  @Test
+  public void testEnumProtoToRow() {
+    SerializableFunction<EnumMessage, Row> toRow =
+        new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(EnumMessage.class));
+    assertEquals(ENUM_ROW, toRow.apply(ENUM_PROTO));
+  }
+
+  @Test
+  public void testEnumRowToProto() {
+    SerializableFunction<Row, EnumMessage> fromRow =
+        new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(EnumMessage.class));
+    assertEquals(ENUM_PROTO, fromRow.apply(ENUM_ROW));
+  }
+
+  @Test
+  public void testWktMessageSchema() {
+    Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(WktMessage.class));
+    assertEquals(WKT_MESSAGE_SCHEMA, schema);
+  }
+
+  @Test
+  public void testWktProtoToRow() {
+    SerializableFunction<WktMessage, Row> toRow =
+        new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(WktMessage.class));
+    assertEquals(WKT_MESSAGE_ROW, toRow.apply(WKT_MESSAGE_PROTO));
+  }
+
+  @Test
+  public void testWktRowToProto() {
+    SerializableFunction<Row, WktMessage> fromRow =
+        new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(WktMessage.class));
+    assertEquals(WKT_MESSAGE_PROTO, fromRow.apply(WKT_MESSAGE_ROW));
+  }
+}
diff --git a/sdks/java/extensions/protobuf/src/test/resources/README.md b/sdks/java/extensions/protobuf/src/test/resources/README.md
new file mode 100644
index 0000000..79083f5
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/test/resources/README.md
@@ -0,0 +1,34 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+This recreates the proto descriptor set included in this resource directory.
+
+```bash
+export PROTO_INCLUDE=<proto_include_dir>
+```
+Execute the following command to create the pb files, in the beam root folder:
+
+```bash
+protoc \
+ -Isdks/java/extensions/protobuf/src/test/resources/ \
+ -I$PROTO_INCLUDE \
+ --descriptor_set_out=sdks/java/extensions/protobuf/src/test/resources/org/apache/beam/sdk/extensions/protobuf/test_option_v1.pb \
+ --include_imports \
+ sdks/java/extensions/protobuf/src/test/resources/test/option/v1/simple.proto
+```
diff --git a/sdks/java/extensions/protobuf/src/test/resources/org/apache/beam/sdk/extensions/protobuf/test_option_v1.pb b/sdks/java/extensions/protobuf/src/test/resources/org/apache/beam/sdk/extensions/protobuf/test_option_v1.pb
new file mode 100644
index 0000000..4e97ad0
Binary files /dev/null and b/sdks/java/extensions/protobuf/src/test/resources/org/apache/beam/sdk/extensions/protobuf/test_option_v1.pb differ
diff --git a/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/option.proto b/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/option.proto
new file mode 100644
index 0000000..ca40119
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/option.proto
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package test.option.v1;
+
+import "google/protobuf/descriptor.proto";
+
+extend google.protobuf.FileOptions {
+    double fileoption_double = 66666700;
+    float fileoption_float = 66666701;
+    int32 fileoption_int32 = 66666702;
+    int64 fileoption_int64 = 66666703;
+    uint32 fileoption_uint32 = 66666704;
+    uint64 fileoption_uint64 = 66666705;
+    sint32 fileoption_sint32 = 66666706;
+    sint64 fileoption_sint64 = 66666707;
+    fixed32 fileoption_fixed32 = 66666708;
+    fixed64 fileoption_fixed64 = 66666709;
+    sfixed32 fileoption_sfixed32 = 66666710;
+    sfixed64 fileoption_sfixed64 = 66666711;
+    bool fileoption_bool = 66666712;
+    string fileoption_string = 66666713;
+    bytes fileoption_bytes = 66666714;
+    OptionMessage fileoption_message = 66666715;
+    OptionEnum fileoption_enum = 66666716;
+}
+
+extend google.protobuf.MessageOptions {
+    double messageoption_double = 66666700;
+    float messageoption_float = 66666701;
+    int32 messageoption_int32 = 66666702;
+    int64 messageoption_int64 = 66666703;
+    uint32 messageoption_uint32 = 66666704;
+    uint64 messageoption_uint64 = 66666705;
+    sint32 messageoption_sint32 = 66666706;
+    sint64 messageoption_sint64 = 66666707;
+    fixed32 messageoption_fixed32 = 66666708;
+    fixed64 messageoption_fixed64 = 66666709;
+    sfixed32 messageoption_sfixed32 = 66666710;
+    sfixed64 messageoption_sfixed64 = 66666711;
+    bool messageoption_bool = 66666712;
+    string messageoption_string = 66666713;
+    bytes messageoption_bytes = 66666714;
+    OptionMessage messageoption_message = 66666715;
+    OptionEnum messageoption_enum = 66666716;
+
+    repeated double messageoption_repeated_double = 66666800;
+    repeated float messageoption_repeated_float = 66666801;
+    repeated int32 messageoption_repeated_int32 = 66666802;
+    repeated int64 messageoption_repeated_int64 = 66666803;
+    repeated uint32 messageoption_repeated_uint32 = 66666804;
+    repeated uint64 messageoption_repeated_uint64 = 66666805;
+    repeated sint32 messageoption_repeated_sint32 = 66666806;
+    repeated sint64 messageoption_repeated_sint64 = 66666807;
+    repeated fixed32 messageoption_repeated_fixed32 = 66666808;
+    repeated fixed64 messageoption_repeated_fixed64 = 66666809;
+    repeated sfixed32 messageoption_repeated_sfixed32 = 66666810;
+    repeated sfixed64 messageoption_repeated_sfixed64 = 66666811;
+    repeated bool messageoption_repeated_bool = 66666812;
+    repeated string messageoption_repeated_string = 66666813;
+    repeated bytes messageoption_repeated_bytes = 66666814;
+    repeated OptionMessage messageoption_repeated_message = 66666815;
+    repeated OptionEnum messageoption_repeated_enum = 66666816;
+}
+
+extend google.protobuf.FieldOptions {
+    double fieldoption_double = 66666700;
+    float fieldoption_float = 66666701;
+    int32 fieldoption_int32 = 66666702;
+    int64 fieldoption_int64 = 66666703;
+    uint32 fieldoption_uint32 = 66666704;
+    uint64 fieldoption_uint64 = 66666705;
+    sint32 fieldoption_sint32 = 66666706;
+    sint64 fieldoption_sint64 = 66666707;
+    fixed32 fieldoption_fixed32 = 66666708;
+    fixed64 fieldoption_fixed64 = 66666709;
+    sfixed32 fieldoption_sfixed32 = 66666710;
+    sfixed64 fieldoption_sfixed64 = 66666711;
+    bool fieldoption_bool = 66666712;
+    string fieldoption_string = 66666713;
+    bytes fieldoption_bytes = 66666714;
+    OptionMessage fieldoption_message = 66666715;
+    OptionEnum fieldoption_enum = 66666716;
+
+    repeated double fieldoption_repeated_double = 66666800;
+    repeated float fieldoption_repeated_float = 66666801;
+    repeated int32 fieldoption_repeated_int32 = 66666802;
+    repeated int64 fieldoption_repeated_int64 = 66666803;
+    repeated uint32 fieldoption_repeated_uint32 = 66666804;
+    repeated uint64 fieldoption_repeated_uint64 = 66666805;
+    repeated sint32 fieldoption_repeated_sint32 = 66666806;
+    repeated sint64 fieldoption_repeated_sint64 = 66666807;
+    repeated fixed32 fieldoption_repeated_fixed32 = 66666808;
+    repeated fixed64 fieldoption_repeated_fixed64 = 66666809;
+    repeated sfixed32 fieldoption_repeated_sfixed32 = 66666810;
+    repeated sfixed64 fieldoption_repeated_sfixed64 = 66666811;
+    repeated bool fieldoption_repeated_bool = 66666812;
+    repeated string fieldoption_repeated_string = 66666813;
+    repeated bytes fieldoption_repeated_bytes = 66666814;
+    repeated OptionMessage fieldoption_repeated_message = 66666815;
+    repeated OptionEnum fieldoption_repeated_enum = 66666816;
+}
+
+enum OptionEnum {
+    DEFAULT = 0;
+    ENUM1 = 1;
+    ENUM2 = 2;
+}
+
+message OptionMessage {
+    string string = 1;
+    repeated string repeated_string = 2;
+
+    int32 int32 = 3;
+    repeated int32 repeated_int32 = 4;
+
+    int64 int64 = 5;
+
+    OptionEnum test_enum = 6;
+}
\ No newline at end of file
diff --git a/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/simple.proto b/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/simple.proto
new file mode 100644
index 0000000..1750ddf
--- /dev/null
+++ b/sdks/java/extensions/protobuf/src/test/resources/test/option/v1/simple.proto
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "test/option/v1/option.proto";
+
+package test.option.v1;
+
+message MessageWithOptions {
+    string test_name = 1;
+    int32 test_index = 2;
+
+    int32 field_with_fieldoption_double = 700 [(test.option.v1.fieldoption_double) = 100.1];
+    int32 field_with_fieldoption_float = 701 [(test.option.v1.fieldoption_float) = 101.2];
+    int32 field_with_fieldoption_int32 = 702 [(test.option.v1.fieldoption_int32) = 102];
+    int32 field_with_fieldoption_int64 = 703 [(test.option.v1.fieldoption_int64) = 103];
+    int32 field_with_fieldoption_uint32 = 704 [(test.option.v1.fieldoption_uint32) = 104];
+    int32 field_with_fieldoption_uint64 = 705 [(test.option.v1.fieldoption_uint64) = 105];
+    int32 field_with_fieldoption_sint32 = 706 [(test.option.v1.fieldoption_sint32) = 106];
+    int32 field_with_fieldoption_sint64 = 707 [(test.option.v1.fieldoption_sint64) = 107];
+    int32 field_with_fieldoption_fixed32 = 708;
+    int32 field_with_fieldoption_fixed64 = 709;
+    int32 field_with_fieldoption_sfixed32 = 710;
+    int32 field_with_fieldoption_sfixed64 = 711;
+    int32 field_with_fieldoption_bool = 712 [(test.option.v1.fieldoption_bool) = true];
+    int32 field_with_fieldoption_string = 713 [(test.option.v1.fieldoption_string) = "Oh yeah"];
+    int32 field_with_fieldoption_bytes = 714;
+    int32 field_with_fieldoption_message = 715;
+    int32 field_with_fieldoption_enum = 716 [(test.option.v1.fieldoption_enum) = ENUM1];
+
+    int32 field_with_fieldoption_repeated_double = 800;
+    int32 field_with_fieldoption_repeated_float = 801;
+    int32 field_with_fieldoption_repeated_int32 = 802;
+    int32 field_with_fieldoption_repeated_int64 = 803;
+    int32 field_with_fieldoption_repeated_uint32 = 804;
+    int32 field_with_fieldoption_repeated_uint64 = 805;
+    int32 field_with_fieldoption_repeated_sint32 = 806;
+    int32 field_with_fieldoption_repeated_sint64 = 807;
+    int32 field_with_fieldoption_repeated_fixed32 = 808;
+    int32 field_with_fieldoption_repeated_fixed64 = 809;
+    int32 field_with_fieldoption_repeated_sfixed32 = 810;
+    int32 field_with_fieldoption_repeated_sfixed64 = 811;
+    int32 field_with_fieldoption_repeated_bool = 812;
+    int32 field_with_fieldoption_repeated_string = 813 [(test.option.v1.fieldoption_repeated_string) = "Oh yeah",
+                                                       (test.option.v1.fieldoption_repeated_string) = "Oh no"];
+    int32 field_with_fieldoption_repeated_bytes = 814;
+    int32 field_with_fieldoption_repeated_message = 815;
+    int32 field_with_fieldoption_repeated_enum = 816;
+
+}
+