You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2019/01/08 18:06:46 UTC

[beam] branch master updated: Merge pull request #7289: [BEAM-6240] Add a library of schema annotations for POJO and JavaBeans

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c379622  Merge pull request #7289: [BEAM-6240] Add a library of schema annotations for POJO and JavaBeans
c379622 is described below

commit c37962211fd12ad6aeada49f8eaf9b3911081568
Author: reuvenlax <re...@google.com>
AuthorDate: Tue Jan 8 10:06:37 2019 -0800

    Merge pull request #7289: [BEAM-6240] Add a library of schema annotations for POJO and JavaBeans
---
 .../runners/gearpump/GearpumpPipelineOptions.java  |   4 +-
 .../runners/gearpump/GearpumpPipelineResult.java   |   2 +-
 .../beam/runners/gearpump/GearpumpRunner.java      |   3 +-
 .../beam/sdk/schemas/FromRowUsingCreator.java      |  10 +-
 .../apache/beam/sdk/schemas/JavaBeanSchema.java    |  88 +++++---
 .../apache/beam/sdk/schemas/JavaFieldSchema.java   |  76 +++++--
 .../apache/beam/sdk/schemas/SchemaRegistry.java    |   1 +
 .../schemas/{ => annotations}/DefaultSchema.java   |   7 +-
 .../beam/sdk/schemas/annotations/SchemaCreate.java |  74 ++++++
 .../sdk/schemas/annotations/SchemaFieldName.java   |  54 +++++
 .../beam/sdk/schemas/annotations/SchemaIgnore.java |  51 +++++
 .../package-info.java}                             |  25 +--
 .../apache/beam/sdk/schemas/utils/AvroUtils.java   |  26 +--
 .../beam/sdk/schemas/utils/ByteBuddyUtils.java     | 178 +++++++++++++++
 .../sdk/schemas/utils/FieldValueTypeSupplier.java  |  11 +-
 .../beam/sdk/schemas/utils/JavaBeanUtils.java      | 112 ++++++++--
 .../apache/beam/sdk/schemas/utils/POJOUtils.java   | 102 +++++++--
 .../beam/sdk/schemas/utils/ReflectUtils.java       |  39 ++++
 .../sdk/schemas/utils/StaticSchemaInference.java   |  32 ++-
 .../beam/sdk/schemas/JavaBeanSchemaTest.java       |  64 +++++-
 .../beam/sdk/schemas/JavaFieldSchemaTest.java      |  70 +++++-
 .../beam/sdk/schemas/SchemaRegistryTest.java       |   7 +-
 .../beam/sdk/schemas/transforms/CastTest.java      |   2 +-
 .../beam/sdk/schemas/transforms/ConvertTest.java   |   2 +-
 .../beam/sdk/schemas/transforms/FilterTest.java    |   2 +-
 .../beam/sdk/schemas/transforms/GroupTest.java     |   2 +-
 .../beam/sdk/schemas/transforms/SelectTest.java    |   2 +-
 .../beam/sdk/schemas/utils/JavaBeanUtilsTest.java  |  38 ++--
 .../beam/sdk/schemas/utils/POJOUtilsTest.java      |  36 +--
 .../beam/sdk/schemas/utils/TestJavaBeans.java      | 146 +++++++++++-
 .../apache/beam/sdk/schemas/utils/TestPOJOs.java   | 248 ++++++++++++++++++++-
 .../beam/sdk/transforms/ParDoSchemaTest.java       |   2 +-
 .../sdk/extensions/sql/example/model/Customer.java |   2 +-
 .../sdk/extensions/sql/example/model/Order.java    |   2 +-
 .../extensions/sql/InferredJavaBeanSqlTest.java    |   2 +-
 .../beam/sdk/io/clickhouse/ClickHouseIOTest.java   |   2 +-
 .../org/apache/beam/sdk/nexmark/model/Auction.java |   2 +-
 .../beam/sdk/nexmark/model/AuctionCount.java       |   2 +-
 .../beam/sdk/nexmark/model/AuctionPrice.java       |   2 +-
 .../org/apache/beam/sdk/nexmark/model/Bid.java     |   2 +-
 .../org/apache/beam/sdk/nexmark/model/Event.java   |   2 +-
 .../beam/sdk/nexmark/model/NameCityStateId.java    |   2 +-
 .../org/apache/beam/sdk/nexmark/model/Person.java  |   2 +-
 43 files changed, 1346 insertions(+), 192 deletions(-)

diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
index eadfc44..28ff8a4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 
-
 /** Options that configure the Gearpump pipeline. */
 public interface GearpumpPipelineOptions extends PipelineOptions {
 
@@ -45,7 +44,8 @@ public interface GearpumpPipelineOptions extends PipelineOptions {
   @JsonIgnore
   Map<String, String> getSerializers();
 
-  @Description("Whether the pipeline will be run on a remote cluster. If false, it will be run on a EmbeddedCluster")
+  @Description(
+      "Whether the pipeline will be run on a remote cluster. If false, it will be run on a EmbeddedCluster")
   void setRemote(Boolean remote);
 
   @Default.Boolean(true)
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 7f9b0be..7e6a4c3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -1,4 +1,4 @@
-  /*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 3181b9c..2aca2ca 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -70,7 +70,8 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
       RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment());
     } else {
       RuntimeEnvironment.setRuntimeEnv(new EmbeddedRuntimeEnvironment());
-      config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
+      config =
+          config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
     }
     ClientContext clientContext = ClientContext.apply(config);
     options.setClientContext(clientContext);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
index 139281f..79f5afe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.schemas;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.Lists;
@@ -55,8 +56,11 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
   public <ValueT> ValueT fromRow(
       Row row, Class<ValueT> clazz, Factory<List<FieldValueTypeInformation>> typeFactory) {
     if (row instanceof RowWithGetters) {
-      // Efficient path: simply extract the underlying object instead of creating a new one.
-      return (ValueT) ((RowWithGetters) row).getGetterTarget();
+      Object target = ((RowWithGetters) row).getGetterTarget();
+      if (target.getClass().equals(clazz)) {
+        // Efficient path: simply extract the underlying object instead of creating a new one.
+        return (ValueT) target;
+      }
     }
 
     Object[] params = new Object[row.getFieldCount()];
@@ -68,7 +72,7 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
 
     for (int i = 0; i < row.getFieldCount(); ++i) {
       FieldType type = schema.getField(i).getType();
-      FieldValueTypeInformation typeInformation = typeInformations.get(i);
+      FieldValueTypeInformation typeInformation = checkNotNull(typeInformations.get(i));
       params[i] =
           fromValue(
               type,
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
index 8eb8022..b7e9dda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
@@ -18,12 +18,14 @@
 package org.apache.beam.sdk.schemas;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
 import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
 import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
 import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
 import org.apache.beam.sdk.schemas.utils.ReflectUtils;
@@ -47,19 +49,20 @@ public class JavaBeanSchema extends GetterBasedSchemaProvider {
   /** {@link FieldValueTypeSupplier} that's based on getter methods. */
   @VisibleForTesting
   public static class GetterTypeSupplier implements FieldValueTypeSupplier {
+    public static final GetterTypeSupplier INSTANCE = new GetterTypeSupplier();
+
     @Override
-    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
-      Map<String, FieldValueTypeInformation> types =
-          ReflectUtils.getMethods(clazz)
-              .stream()
-              .filter(ReflectUtils::isGetter)
-              .map(FieldValueTypeInformation::forGetter)
-              .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
-      // Return the list ordered by the schema fields.
-      return schema
-          .getFields()
+    public List<FieldValueTypeInformation> get(Class<?> clazz) {
+      return ReflectUtils.getMethods(clazz)
           .stream()
-          .map(f -> types.get(f.getName()))
+          .filter(ReflectUtils::isGetter)
+          .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
+          .map(FieldValueTypeInformation::forGetter)
+          .map(
+              t -> {
+                SchemaFieldName fieldName = t.getMethod().getAnnotation(SchemaFieldName.class);
+                return (fieldName != null) ? t.withName(fieldName.value()) : t;
+              })
           .collect(Collectors.toList());
     }
   }
@@ -67,50 +70,77 @@ public class JavaBeanSchema extends GetterBasedSchemaProvider {
   /** {@link FieldValueTypeSupplier} that's based on setter methods. */
   @VisibleForTesting
   public static class SetterTypeSupplier implements FieldValueTypeSupplier {
+    private static final SetterTypeSupplier INSTANCE = new SetterTypeSupplier();
+
     @Override
-    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
-      Map<String, FieldValueTypeInformation> types =
-          ReflectUtils.getMethods(clazz)
-              .stream()
-              .filter(ReflectUtils::isSetter)
-              .map(FieldValueTypeInformation::forSetter)
-              .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
-      // Return the list ordered by the schema fields.
-      return schema
-          .getFields()
+    public List<FieldValueTypeInformation> get(Class<?> clazz) {
+      return ReflectUtils.getMethods(clazz)
           .stream()
-          .map(f -> types.get(f.getName()))
+          .filter(ReflectUtils::isSetter)
+          .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
+          .map(FieldValueTypeInformation::forSetter)
           .collect(Collectors.toList());
     }
   }
 
   @Override
   public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
-    return JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor.getRawType());
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(
+            typeDescriptor.getRawType(), GetterTypeSupplier.INSTANCE);
+
+    // If there are no creator methods, then validate that we have setters for every field.
+    // Otherwise, we will have not way of creating the class.
+    if (ReflectUtils.getAnnotatedCreateMethod(typeDescriptor.getRawType()) == null
+        && ReflectUtils.getAnnotatedConstructor(typeDescriptor.getRawType()) == null) {
+      JavaBeanUtils.validateJavaBean(
+          GetterTypeSupplier.INSTANCE.get(typeDescriptor.getRawType(), schema),
+          SetterTypeSupplier.INSTANCE.get(typeDescriptor.getRawType(), schema));
+    }
+    return schema;
   }
 
   @Override
   public FieldValueGetterFactory fieldValueGetterFactory() {
     return (Class<?> targetClass, Schema schema) ->
-        JavaBeanUtils.getGetters(targetClass, schema, new GetterTypeSupplier());
+        JavaBeanUtils.getGetters(targetClass, schema, GetterTypeSupplier.INSTANCE);
   }
 
   @Override
   UserTypeCreatorFactory schemaTypeCreatorFactory() {
-    return new SetterBasedCreatorFactory(new JavaBeanSetterFactory());
+    UserTypeCreatorFactory setterBasedFactory =
+        new SetterBasedCreatorFactory(new JavaBeanSetterFactory());
+
+    return (Class<?> targetClass, Schema schema) -> {
+      // If a static method is marked with @SchemaCreate, use that.
+      Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetClass);
+      if (annotated != null) {
+        return JavaBeanUtils.getStaticCreator(
+            targetClass, annotated, schema, GetterTypeSupplier.INSTANCE);
+      }
+
+      // If a Constructor was tagged with @SchemaCreate, invoke that constructor.
+      Constructor<?> constructor = ReflectUtils.getAnnotatedConstructor(targetClass);
+      if (constructor != null) {
+        return JavaBeanUtils.getConstructorCreator(
+            targetClass, constructor, schema, GetterTypeSupplier.INSTANCE);
+      }
+
+      return setterBasedFactory.create(targetClass, schema);
+    };
   }
 
   @Override
   public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
     return (Class<?> targetClass, Schema schema) ->
-        JavaBeanUtils.getFieldTypes(targetClass, schema, new GetterTypeSupplier());
+        JavaBeanUtils.getFieldTypes(targetClass, schema, GetterTypeSupplier.INSTANCE);
   }
 
   /** A factory for creating {@link FieldValueSetter} objects for a JavaBean object. */
   public static class JavaBeanSetterFactory implements FieldValueSetterFactory {
     @Override
     public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) {
-      return JavaBeanUtils.getSetters(targetClass, schema, new SetterTypeSupplier());
+      return JavaBeanUtils.getSetters(targetClass, schema, SetterTypeSupplier.INSTANCE);
     }
   }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
index 1504717..73e8dac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
@@ -18,12 +18,17 @@
 package org.apache.beam.sdk.schemas;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
 import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
 import org.apache.beam.sdk.schemas.utils.POJOUtils;
 import org.apache.beam.sdk.schemas.utils.ReflectUtils;
@@ -47,42 +52,81 @@ public class JavaFieldSchema extends GetterBasedSchemaProvider {
   /** {@link FieldValueTypeSupplier} that's based on public fields. */
   @VisibleForTesting
   public static class JavaFieldTypeSupplier implements FieldValueTypeSupplier {
+    public static final JavaFieldTypeSupplier INSTANCE = new JavaFieldTypeSupplier();
+
     @Override
-    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
-      Map<String, FieldValueTypeInformation> types =
+    public List<FieldValueTypeInformation> get(Class<?> clazz) {
+      List<FieldValueTypeInformation> types =
           ReflectUtils.getFields(clazz)
               .stream()
+              .filter(f -> !f.isAnnotationPresent(SchemaIgnore.class))
               .map(FieldValueTypeInformation::forField)
-              .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
-      // Return the list ordered by the schema fields.
-      return schema
-          .getFields()
-          .stream()
-          .map(f -> types.get(f.getName()))
-          .collect(Collectors.toList());
+              .map(
+                  t -> {
+                    SchemaFieldName fieldName = t.getField().getAnnotation(SchemaFieldName.class);
+                    return (fieldName != null) ? t.withName(fieldName.value()) : t;
+                  })
+              .collect(Collectors.toList());
+
+      // If there are no creators registered, then make sure none of the schema fields are final,
+      // as we (currently) have no way of creating classes in this case.
+      if (ReflectUtils.getAnnotatedCreateMethod(clazz) == null
+          && ReflectUtils.getAnnotatedConstructor(clazz) == null) {
+        Optional<Field> finalField =
+            types
+                .stream()
+                .map(FieldValueTypeInformation::getField)
+                .filter(f -> Modifier.isFinal(f.getModifiers()))
+                .findAny();
+        if (finalField.isPresent()) {
+          throw new IllegalArgumentException(
+              "Class "
+                  + clazz
+                  + " has final fields and no "
+                  + "registered creator. Cannot use as schema, as we don't know how to create this "
+                  + "object automatically");
+        }
+      }
+      return types;
     }
   }
 
   @Override
   public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
-    return POJOUtils.schemaFromPojoClass(typeDescriptor.getRawType());
+    return POJOUtils.schemaFromPojoClass(
+        typeDescriptor.getRawType(), JavaFieldTypeSupplier.INSTANCE);
   }
 
   @Override
   public FieldValueGetterFactory fieldValueGetterFactory() {
     return (Class<?> targetClass, Schema schema) ->
-        POJOUtils.getGetters(targetClass, schema, new JavaFieldTypeSupplier());
+        POJOUtils.getGetters(targetClass, schema, JavaFieldTypeSupplier.INSTANCE);
   }
 
   @Override
   public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
     return (Class<?> targetClass, Schema schema) ->
-        POJOUtils.getFieldTypes(targetClass, schema, new JavaFieldTypeSupplier());
+        POJOUtils.getFieldTypes(targetClass, schema, JavaFieldTypeSupplier.INSTANCE);
   }
 
   @Override
   UserTypeCreatorFactory schemaTypeCreatorFactory() {
-    return (Class<?> targetClass, Schema schema) ->
-        POJOUtils.getCreator(targetClass, schema, new JavaFieldTypeSupplier());
+    return (Class<?> targetClass, Schema schema) -> {
+      // If a static method is marked with @SchemaCreate, use that.
+      Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetClass);
+      if (annotated != null) {
+        return POJOUtils.getStaticCreator(
+            targetClass, annotated, schema, JavaFieldTypeSupplier.INSTANCE);
+      }
+
+      // If a Constructor was tagged with @SchemaCreate, invoke that constructor.
+      Constructor<?> constructor = ReflectUtils.getAnnotatedConstructor(targetClass);
+      if (constructor != null) {
+        return POJOUtils.getConstructorCreator(
+            targetClass, constructor, schema, JavaFieldTypeSupplier.INSTANCE);
+      }
+
+      return POJOUtils.getSetFieldCreator(targetClass, schema, JavaFieldTypeSupplier.INSTANCE);
+    };
   }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java
index a38af7e..a2daaa4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java
@@ -30,6 +30,7 @@ import java.util.function.Function;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/DefaultSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/DefaultSchema.java
similarity index 94%
rename from sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/DefaultSchema.java
rename to sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/DefaultSchema.java
index c5cc20e..a364b8c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/DefaultSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/DefaultSchema.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.schemas;
+package org.apache.beam.sdk.schemas.annotations;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -33,6 +33,9 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.SchemaProviderRegistrar;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -97,7 +100,7 @@ public @interface DefaultSchema {
                       + " specified as the default SchemaProvider for type "
                       + type
                       + ". Make "
-                      + " sure that this class has a default constructor.",
+                      + " sure that this class has a public default constructor.",
                   e);
             }
           });
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaCreate.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaCreate.java
new file mode 100644
index 0000000..a3c51b2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaCreate.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Can be put on a constructor or a static method, in which case that constructor or method will be
+ * used to created instance of the class by Beam's schema code.
+ *
+ * <p>For example, the following Java POJO.
+ *
+ * <pre><code>
+ *   {@literal @}DefaultSchema(JavaBeanSchema.class)
+ *  class MyClass {
+ *    public final String user;
+ *    public final int age;
+ *
+ *    {@literal @}SchemaCreate
+ *    public MyClass(String user, int age) {
+ *      this.user = user;
+ *      this.age = age;
+ *    }
+ *  }
+ * </code></pre>
+ *
+ * <p>This tells Beam that this constructor can be used to construct instances. Beam will match up
+ * the names of the constructor arguments to schema fields in order to decide how to create the
+ * class from a Row.
+ *
+ * <p>This can also be used to annotate a static factory method on the class. For example:
+ *
+ * <pre><code>
+ * {@literal @}DefaultSchema(JavaBeanSchema.class)
+ * class MyClass {
+ *   public final String user;
+ *   public final int age;
+ *
+ *   private MyClass(String user, int age) { this.user = user; this.age = age; }
+ *
+ *   {@literal @}SchemaCreate
+ *   public static MyClass create(String user, int age) {
+ *      return new MyClass(user, age);
+ *    }
+ * }
+ * </code></pre>
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.CONSTRUCTOR, ElementType.METHOD})
+@SuppressWarnings("rawtypes")
+@Experimental(Kind.SCHEMAS)
+public @interface SchemaCreate {}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldName.java
new file mode 100644
index 0000000..88c1de8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldName.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * When used on a POJO field or a JavaBean getter, the specified name is used for the generated
+ * schema field.
+ *
+ * <p>For example, a Java POJO with a field that we want in our schema but under a different name.
+ *
+ * <pre><code>
+ *   {@literal @}DefaultSchema(JavaBeanSchema.class)
+ *   class MyClass {
+ *     public String user;
+ *     {@literal @}SchemaFieldName("age")
+ *     public int ageInYears;
+ *   }
+ * </code></pre>
+ *
+ * <p>The resulting schema will have fields named "user" and "age."
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD})
+@SuppressWarnings("rawtypes")
+@Experimental(Kind.SCHEMAS)
+public @interface SchemaFieldName {
+  @Nonnull
+  String value();
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaIgnore.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaIgnore.java
new file mode 100644
index 0000000..b7419d5
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaIgnore.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * When used on a POJO field or a JavaBean getter, that field or getter is ignored from the inferred
+ * schema.
+ *
+ * <p>For example, a Java POJO with a field that we don't want included in the schema.
+ *
+ * <pre><code>
+ *   {@literal @}DefaultSchema(JavaBeanSchema.class)
+ *   class MyClass {
+ *     public String user;
+ *     public int age;
+ *     {@literal @}SchemaIgnore public String pleaseDontAddToSchema;
+ *   }
+ * </code></pre>
+ *
+ * <p>In this case, the pleaseDontAddToSchema will be excluded from the schema, and implicitly
+ * dropped from calculations.
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD})
+@SuppressWarnings("rawtypes")
+@Experimental(Kind.SCHEMAS)
+public @interface SchemaIgnore {}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/package-info.java
similarity index 57%
copy from sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
copy to sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/package-info.java
index 12330ec..99a5a9d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/package-info.java
@@ -15,21 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
-import org.apache.beam.sdk.schemas.Schema;
-
 /**
- * A naming policy for schema fields. This maps a name from the class (field name or getter name) to
- * the matching field name in the schema.
+ * Defines {@link org.apache.beam.sdk.schemas.Schema} and other classes for representing schema'd
+ * data in a {@link org.apache.beam.sdk.Pipeline}.
+ *
+ * <p>For further details, see the documentation for each class in this package.
  */
-public interface FieldValueTypeSupplier extends Serializable {
-  /**
-   * Return all the FieldValueTypeInformations. The returned list must be in the same order as
-   * fields in the schema.
-   */
-  List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema);
-}
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.schemas.annotations;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 43e6a5c..177ff64 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -302,26 +302,27 @@ public class AvroUtils {
   private static final class AvroSpecificRecordFieldValueTypeSupplier
       implements FieldValueTypeSupplier {
     @Override
+    public List<FieldValueTypeInformation> get(Class<?> clazz) {
+      throw new RuntimeException("Unexpected call.");
+    }
+
+    @Override
     public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
       Map<String, String> mapping = getMapping(schema);
-      Map<String, FieldValueTypeInformation> types = Maps.newHashMap();
+      List<FieldValueTypeInformation> types = Lists.newArrayList();
       for (Method method : ReflectUtils.getMethods(clazz)) {
         if (ReflectUtils.isGetter(method)) {
           FieldValueTypeInformation fieldValueTypeInformation =
               FieldValueTypeInformation.forGetter(method);
           String name = mapping.get(fieldValueTypeInformation.getName());
           if (name != null) {
-            types.put(name, fieldValueTypeInformation.withName(name));
+            types.add(fieldValueTypeInformation.withName(name));
           }
         }
       }
 
       // Return the list ordered by the schema fields.
-      return schema
-          .getFields()
-          .stream()
-          .map(f -> types.get(f.getName()))
-          .collect(Collectors.toList());
+      return StaticSchemaInference.sortBySchema(types, schema);
     }
 
     private Map<String, String> getMapping(Schema schema) {
@@ -340,7 +341,7 @@ public class AvroUtils {
 
   private static final class AvroPojoFieldValueTypeSupplier implements FieldValueTypeSupplier {
     @Override
-    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+    public List<FieldValueTypeInformation> get(Class<?> clazz) {
       Map<String, FieldValueTypeInformation> types = Maps.newHashMap();
       for (java.lang.reflect.Field f : ReflectUtils.getFields(clazz)) {
         if (!f.isAnnotationPresent(AvroIgnore.class)) {
@@ -352,12 +353,7 @@ public class AvroUtils {
           types.put(typeInformation.getName(), typeInformation);
         }
       }
-      // Return the list ordered by the schema fields.
-      return schema
-          .getFields()
-          .stream()
-          .map(f -> types.get(f.getName()))
-          .collect(Collectors.toList());
+      return Lists.newArrayList(types.values());
     }
   }
 
@@ -386,7 +382,7 @@ public class AvroUtils {
     if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) {
       return AvroByteBuddyUtils.getCreator((Class<? extends SpecificRecord>) clazz, schema);
     } else {
-      return POJOUtils.getCreator(clazz, schema, new AvroPojoFieldValueTypeSupplier());
+      return POJOUtils.getSetFieldCreator(clazz, schema, new AvroPojoFieldValueTypeSupplier());
     }
   }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
index a0d40c8..a062496 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
@@ -17,6 +17,14 @@
  */
 package org.apache.beam.sdk.schemas.utils;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Parameter;
 import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -27,9 +35,15 @@ import java.util.Map;
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.NamingStrategy;
 import net.bytebuddy.NamingStrategy.SuffixingRandom.BaseNameResolver;
+import net.bytebuddy.description.method.MethodDescription.ForLoadedConstructor;
+import net.bytebuddy.description.method.MethodDescription.ForLoadedMethod;
 import net.bytebuddy.description.type.TypeDescription;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
 import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender.Size;
 import net.bytebuddy.implementation.bytecode.Duplication;
 import net.bytebuddy.implementation.bytecode.StackManipulation;
 import net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
@@ -37,13 +51,18 @@ import net.bytebuddy.implementation.bytecode.TypeCreation;
 import net.bytebuddy.implementation.bytecode.assign.Assigner;
 import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
 import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.collection.ArrayAccess;
 import net.bytebuddy.implementation.bytecode.collection.ArrayFactory;
+import net.bytebuddy.implementation.bytecode.constant.IntegerConstant;
 import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
 import net.bytebuddy.matcher.ElementMatchers;
 import net.bytebuddy.utility.RandomString;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.FieldValueSetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeParameter;
 import org.apache.commons.lang3.ArrayUtils;
@@ -595,4 +614,163 @@ class ByteBuddyUtils {
       return readValue;
     }
   }
+
+  /**
+   * Invokes a constructor registered using SchemaCreate. As constructor parameters might not be in
+   * the same order as the schema fields, reorders the parameters as necessary before calling the
+   * constructor.
+   */
+  static class ConstructorCreateInstruction extends InvokeUserCreateInstruction {
+    private final Constructor constructor;
+
+    ConstructorCreateInstruction(
+        List<FieldValueTypeInformation> fields, Class targetClass, Constructor constructor) {
+      super(fields, targetClass, Lists.newArrayList(constructor.getParameters()));
+      this.constructor = constructor;
+    }
+
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    protected StackManipulation beforePushingParameters() {
+      // Create the target class.
+      ForLoadedType loadedType = new ForLoadedType(targetClass);
+      return new StackManipulation.Compound(TypeCreation.of(loadedType), Duplication.SINGLE);
+    }
+
+    @Override
+    protected StackManipulation afterPushingParameters() {
+      return MethodInvocation.invoke(new ForLoadedConstructor(constructor));
+    }
+  }
+
+  /**
+   * Invokes a static factory method registered using SchemaCreate. As the method parameters might
+   * not be in the same order as the schema fields, reorders the parameters as necessary before
+   * calling the constructor.
+   */
+  static class StaticFactoryMethodInstruction extends InvokeUserCreateInstruction {
+    private final Method creator;
+
+    StaticFactoryMethodInstruction(
+        List<FieldValueTypeInformation> fields, Class targetClass, Method creator) {
+      super(fields, targetClass, Lists.newArrayList(creator.getParameters()));
+      if (!Modifier.isStatic(creator.getModifiers())) {
+        throw new IllegalArgumentException("Method " + creator + " is not static");
+      }
+      this.creator = creator;
+    }
+
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    protected StackManipulation afterPushingParameters() {
+      return MethodInvocation.invoke(new ForLoadedMethod(creator));
+    }
+  }
+
+  static class InvokeUserCreateInstruction implements Implementation {
+    protected final List<FieldValueTypeInformation> fields;
+    protected final Class targetClass;
+    protected final List<Parameter> parameters;
+    protected final Map<Integer, Integer> fieldMapping;
+
+    protected InvokeUserCreateInstruction(
+        List<FieldValueTypeInformation> fields, Class targetClass, List<Parameter> parameters) {
+      this.fields = fields;
+      this.targetClass = targetClass;
+      this.parameters = parameters;
+
+      // Method parameters might not be in the same order as the schema fields, and the input
+      // array to SchemaUserTypeCreator.create is in schema order. Examine the parameter names
+      // and compare against field names to calculate the mapping between the two lists.
+      Map<String, Integer> fieldsByLogicalName = Maps.newHashMap();
+      Map<String, Integer> fieldsByJavaClassMember = Maps.newHashMap();
+      for (int i = 0; i < fields.size(); ++i) {
+        // Method parameters are allowed to either correspond to the schema field names or to the
+        // actual Java field or method names.
+        FieldValueTypeInformation fieldValue = checkNotNull(fields.get(i));
+        fieldsByLogicalName.put(fieldValue.getName(), i);
+        if (fieldValue.getField() != null) {
+          fieldsByJavaClassMember.put(fieldValue.getField().getName(), i);
+        } else if (fieldValue.getMethod() != null) {
+          String name = ReflectUtils.stripPrefix(fieldValue.getMethod().getName(), "set");
+          fieldsByJavaClassMember.put(name, i);
+        }
+      }
+
+      fieldMapping = Maps.newHashMap();
+      for (int i = 0; i < parameters.size(); ++i) {
+        Parameter parameter = parameters.get(i);
+        String paramName = parameter.getName();
+        Integer index = fieldsByLogicalName.get(paramName);
+        if (index == null) {
+          index = fieldsByJavaClassMember.get(paramName);
+        }
+        if (index == null) {
+          throw new RuntimeException(
+              "Creator parameter " + paramName + " Doesn't correspond to a schema field");
+        }
+        fieldMapping.put(i, index);
+      }
+    }
+
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      return (methodVisitor, implementationContext, instrumentedMethod) -> {
+        // this + method parameters.
+        int numLocals = 1 + instrumentedMethod.getParameters().size();
+
+        StackManipulation stackManipulation = beforePushingParameters();
+
+        // Push all creator parameters on the stack.
+        ConvertType convertType = new ConvertType(true);
+        for (int i = 0; i < parameters.size(); i++) {
+          Parameter parameter = parameters.get(i);
+          ForLoadedType convertedType =
+              new ForLoadedType(
+                  (Class) convertType.convert(TypeDescriptor.of(parameter.getType())));
+
+          // The instruction to read the parameter. Use the fieldMapping to reorder parameters as
+          // necessary.
+          StackManipulation readParameter =
+              new StackManipulation.Compound(
+                  MethodVariableAccess.REFERENCE.loadFrom(1),
+                  IntegerConstant.forValue(fieldMapping.get(i)),
+                  ArrayAccess.REFERENCE.load(),
+                  TypeCasting.to(convertedType));
+          stackManipulation =
+              new StackManipulation.Compound(
+                  stackManipulation,
+                  new ConvertValueForSetter(readParameter)
+                      .convert(TypeDescriptor.of(parameter.getType())));
+        }
+        stackManipulation =
+            new StackManipulation.Compound(
+                stackManipulation, afterPushingParameters(), MethodReturn.REFERENCE);
+
+        StackManipulation.Size size = stackManipulation.apply(methodVisitor, implementationContext);
+        return new Size(size.getMaximalSize(), numLocals);
+      };
+    }
+
+    protected StackManipulation beforePushingParameters() {
+      return new StackManipulation.Compound();
+    }
+
+    protected StackManipulation afterPushingParameters() {
+      return new StackManipulation.Compound();
+    }
+  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
index 12330ec..d93456b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
@@ -27,9 +27,16 @@ import org.apache.beam.sdk.schemas.Schema;
  * the matching field name in the schema.
  */
 public interface FieldValueTypeSupplier extends Serializable {
+  /** Return all the FieldValueTypeInformations. */
+  List<FieldValueTypeInformation> get(Class<?> clazz);
+
   /**
-   * Return all the FieldValueTypeInformations. The returned list must be in the same order as
+   * Return all the FieldValueTypeInformations.
+   *
+   * <p>If the schema parameter is not null, then the returned list must be in the same order as
    * fields in the schema.
    */
-  List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema);
+  default List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+    return StaticSchemaInference.sortBySchema(get(clazz), schema);
+  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
index 58cc0ed..bb9a76c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.schemas.utils;
 
 import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
@@ -44,8 +45,11 @@ import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.FieldValueSetter;
 import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConstructorCreateInstruction;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.StaticFactoryMethodInstruction;
 import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
@@ -53,33 +57,21 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
 @Experimental(Kind.SCHEMAS)
 public class JavaBeanUtils {
   /** Create a {@link Schema} for a Java Bean class. */
-  public static Schema schemaFromJavaBeanClass(Class<?> clazz) {
-    return StaticSchemaInference.schemaFromClass(clazz, JavaBeanUtils::typeInformationFromClass);
+  public static Schema schemaFromJavaBeanClass(
+      Class<?> clazz, FieldValueTypeSupplier fieldValueTypeSupplier) {
+    return StaticSchemaInference.schemaFromClass(clazz, fieldValueTypeSupplier);
   }
 
-  private static List<FieldValueTypeInformation> typeInformationFromClass(Class<?> clazz) {
-    List<FieldValueTypeInformation> getterTypes =
-        ReflectUtils.getMethods(clazz)
-            .stream()
-            .filter(ReflectUtils::isGetter)
-            .map(FieldValueTypeInformation::forGetter)
-            .collect(Collectors.toList());
-
-    Map<String, FieldValueTypeInformation> setterTypes =
-        ReflectUtils.getMethods(clazz)
+  // Make sure that there are matching setters and getters.
+  public static void validateJavaBean(
+      List<FieldValueTypeInformation> getters, List<FieldValueTypeInformation> setters) {
+    Map<String, FieldValueTypeInformation> setterMap =
+        setters
             .stream()
-            .filter(ReflectUtils::isSetter)
-            .map(FieldValueTypeInformation::forSetter)
             .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
-    validateJavaBean(getterTypes, setterTypes);
-    return getterTypes;
-  }
 
-  // Make sure that there are matching setters and getters.
-  private static void validateJavaBean(
-      List<FieldValueTypeInformation> getters, Map<String, FieldValueTypeInformation> setters) {
     for (FieldValueTypeInformation type : getters) {
-      FieldValueTypeInformation setterType = setters.get(type.getName());
+      FieldValueTypeInformation setterType = setterMap.get(type.getName());
       if (setterType == null) {
         throw new RuntimeException(
             "JavaBean contained a getter for field "
@@ -218,6 +210,84 @@ public class JavaBeanUtils {
         .intercept(new InvokeSetterInstruction(method));
   }
 
+  // The list of constructors for a class is cached, so we only create the classes the first time
+  // getConstructor is called.
+  public static final Map<ClassWithSchema, SchemaUserTypeCreator> CACHED_CREATORS =
+      Maps.newConcurrentMap();
+
+  public static SchemaUserTypeCreator getConstructorCreator(
+      Class clazz,
+      Constructor constructor,
+      Schema schema,
+      FieldValueTypeSupplier fieldValueTypeSupplier) {
+    return CACHED_CREATORS.computeIfAbsent(
+        new ClassWithSchema(clazz, schema),
+        c -> {
+          List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
+          return createConstructorCreator(clazz, constructor, schema, types);
+        });
+  }
+
+  public static <T> SchemaUserTypeCreator createConstructorCreator(
+      Class<T> clazz,
+      Constructor<T> constructor,
+      Schema schema,
+      List<FieldValueTypeInformation> types) {
+    try {
+      DynamicType.Builder<SchemaUserTypeCreator> builder =
+          BYTE_BUDDY
+              .subclass(SchemaUserTypeCreator.class)
+              .method(ElementMatchers.named("create"))
+              .intercept(new ConstructorCreateInstruction(types, clazz, constructor));
+      return builder
+          .make()
+          .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+          .getLoaded()
+          .getDeclaredConstructor()
+          .newInstance();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new RuntimeException(
+          "Unable to generate a creator for class " + clazz + " with schema " + schema);
+    }
+  }
+
+  public static SchemaUserTypeCreator getStaticCreator(
+      Class clazz, Method creator, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
+    return CACHED_CREATORS.computeIfAbsent(
+        new ClassWithSchema(clazz, schema),
+        c -> {
+          List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
+          return createStaticCreator(clazz, creator, schema, types);
+        });
+  }
+
+  public static <T> SchemaUserTypeCreator createStaticCreator(
+      Class<T> clazz, Method creator, Schema schema, List<FieldValueTypeInformation> types) {
+    try {
+      DynamicType.Builder<SchemaUserTypeCreator> builder =
+          BYTE_BUDDY
+              .subclass(SchemaUserTypeCreator.class)
+              .method(ElementMatchers.named("create"))
+              .intercept(new StaticFactoryMethodInstruction(types, clazz, creator));
+
+      return builder
+          .make()
+          .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+          .getLoaded()
+          .getDeclaredConstructor()
+          .newInstance();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new RuntimeException(
+          "Unable to generate a creator for " + clazz + " with schema " + schema);
+    }
+  }
+
   // Implements a method to read a public getter out of an object.
   private static class InvokeGetterInstruction implements Implementation {
     private final FieldValueTypeInformation typeInformation;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
index 2c22490..07c5915 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
@@ -18,11 +18,12 @@
 package org.apache.beam.sdk.schemas.utils;
 
 import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import net.bytebuddy.ByteBuddy;
@@ -53,9 +54,11 @@ import org.apache.beam.sdk.schemas.FieldValueSetter;
 import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConstructorCreateInstruction;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.InjectPackageStrategy;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.StaticFactoryMethodInstruction;
 import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -63,14 +66,9 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /** A set of utilities yo generate getter and setter classes for POJOs. */
 @Experimental(Kind.SCHEMAS)
 public class POJOUtils {
-  public static Schema schemaFromPojoClass(Class<?> clazz) {
-    Function<Class, List<FieldValueTypeInformation>> getTypesForClass =
-        c ->
-            ReflectUtils.getFields(c)
-                .stream()
-                .map(FieldValueTypeInformation::forField)
-                .collect(Collectors.toList());
-    return StaticSchemaInference.schemaFromClass(clazz, getTypesForClass);
+  public static Schema schemaFromPojoClass(
+      Class<?> clazz, FieldValueTypeSupplier fieldValueTypeSupplier) {
+    return StaticSchemaInference.schemaFromClass(clazz, fieldValueTypeSupplier);
   }
 
   // Static ByteBuddy instance used by all helpers.
@@ -112,17 +110,17 @@ public class POJOUtils {
   public static final Map<ClassWithSchema, SchemaUserTypeCreator> CACHED_CREATORS =
       Maps.newConcurrentMap();
 
-  public static <T> SchemaUserTypeCreator getCreator(
+  public static <T> SchemaUserTypeCreator getSetFieldCreator(
       Class<T> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
     return CACHED_CREATORS.computeIfAbsent(
         new ClassWithSchema(clazz, schema),
         c -> {
           List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
-          return createCreator(clazz, schema, types);
+          return createSetFieldCreator(clazz, schema, types);
         });
   }
 
-  private static <T> SchemaUserTypeCreator createCreator(
+  private static <T> SchemaUserTypeCreator createSetFieldCreator(
       Class<T> clazz, Schema schema, List<FieldValueTypeInformation> types) {
     // Get the list of class fields ordered by schema.
     List<Field> fields =
@@ -133,7 +131,81 @@ public class POJOUtils {
               .with(new InjectPackageStrategy(clazz))
               .subclass(SchemaUserTypeCreator.class)
               .method(ElementMatchers.named("create"))
-              .intercept(new CreateInstruction(fields, clazz));
+              .intercept(new SetFieldCreateInstruction(fields, clazz));
+
+      return builder
+          .make()
+          .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+          .getLoaded()
+          .getDeclaredConstructor()
+          .newInstance();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new RuntimeException(
+          "Unable to generate a creator for " + clazz + " with schema " + schema);
+    }
+  }
+
+  public static SchemaUserTypeCreator getConstructorCreator(
+      Class clazz,
+      Constructor constructor,
+      Schema schema,
+      FieldValueTypeSupplier fieldValueTypeSupplier) {
+    return CACHED_CREATORS.computeIfAbsent(
+        new ClassWithSchema(clazz, schema),
+        c -> {
+          List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
+          return createConstructorCreator(clazz, constructor, schema, types);
+        });
+  }
+
+  public static <T> SchemaUserTypeCreator createConstructorCreator(
+      Class<T> clazz,
+      Constructor<T> constructor,
+      Schema schema,
+      List<FieldValueTypeInformation> types) {
+    try {
+      DynamicType.Builder<SchemaUserTypeCreator> builder =
+          BYTE_BUDDY
+              .subclass(SchemaUserTypeCreator.class)
+              .method(ElementMatchers.named("create"))
+              .intercept(new ConstructorCreateInstruction(types, clazz, constructor));
+
+      return builder
+          .make()
+          .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+          .getLoaded()
+          .getDeclaredConstructor()
+          .newInstance();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new RuntimeException(
+          "Unable to generate a creator for " + clazz + " with schema " + schema);
+    }
+  }
+
+  public static SchemaUserTypeCreator getStaticCreator(
+      Class clazz, Method creator, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
+    return CACHED_CREATORS.computeIfAbsent(
+        new ClassWithSchema(clazz, schema),
+        c -> {
+          List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
+          return createStaticCreator(clazz, creator, schema, types);
+        });
+  }
+
+  public static <T> SchemaUserTypeCreator createStaticCreator(
+      Class<T> clazz, Method creator, Schema schema, List<FieldValueTypeInformation> types) {
+    try {
+      DynamicType.Builder<SchemaUserTypeCreator> builder =
+          BYTE_BUDDY
+              .subclass(SchemaUserTypeCreator.class)
+              .method(ElementMatchers.named("create"))
+              .intercept(new StaticFactoryMethodInstruction(types, clazz, creator));
 
       return builder
           .make()
@@ -345,11 +417,11 @@ public class POJOUtils {
   }
 
   // Implements a method to construct an object.
-  static class CreateInstruction implements Implementation {
+  static class SetFieldCreateInstruction implements Implementation {
     private final List<Field> fields;
     private final Class pojoClass;
 
-    CreateInstruction(List<Field> fields, Class pojoClass) {
+    SetFieldCreateInstruction(List<Field> fields, Class pojoClass) {
       this.fields = fields;
       this.pojoClass = pojoClass;
     }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
index 26b3d70..300b8f11 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
@@ -21,16 +21,20 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.security.InvalidParameterException;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
 
 /** A set of reflection helper methods. */
 public class ReflectUtils {
@@ -62,6 +66,7 @@ public class ReflectUtils {
   }
 
   private static final Map<Class, List<Method>> DECLARED_METHODS = Maps.newHashMap();
+  private static final Map<Class, Method> ANNOTATED_CONSTRUCTORS = Maps.newHashMap();
   private static final Map<Class, List<Field>> DECLARED_FIELDS = Maps.newHashMap();
 
   /** Returns the list of public, non-static methods in the class, caching the results. */
@@ -77,6 +82,40 @@ public class ReflectUtils {
         });
   }
 
+  @Nullable
+  public static Constructor getAnnotatedConstructor(Class clazz) {
+    return Arrays.stream(clazz.getDeclaredConstructors())
+        .filter(m -> !Modifier.isPrivate(m.getModifiers()))
+        .filter(m -> !Modifier.isProtected(m.getModifiers()))
+        .filter(m -> m.getAnnotation(SchemaCreate.class) != null)
+        .findFirst()
+        .orElse(null);
+  }
+
+  @Nullable
+  public static Method getAnnotatedCreateMethod(Class clazz) {
+    return ANNOTATED_CONSTRUCTORS.computeIfAbsent(
+        clazz,
+        c -> {
+          Method method =
+              Arrays.stream(clazz.getDeclaredMethods())
+                  .filter(m -> !Modifier.isPrivate(m.getModifiers()))
+                  .filter(m -> !Modifier.isProtected(m.getModifiers()))
+                  .filter(m -> Modifier.isStatic(m.getModifiers()))
+                  .filter(m -> m.getAnnotation(SchemaCreate.class) != null)
+                  .findFirst()
+                  .orElse(null);
+          if (method != null && !clazz.isAssignableFrom(method.getReturnType())) {
+            throw new InvalidParameterException(
+                "A method marked with SchemaCreate in class "
+                    + clazz
+                    + " does not return a type assignable to "
+                    + clazz);
+          }
+          return method;
+        });
+  }
+
   // Get all public, non-static, non-transient fields.
   public static List<Field> getFields(Class<?> clazz) {
     return DECLARED_FIELDS.computeIfAbsent(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
index d3f81e8..799b7d6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -35,6 +36,19 @@ import org.joda.time.ReadableInstant;
 
 /** A set of utilities for inferring a Beam {@link Schema} from static Java types. */
 public class StaticSchemaInference {
+  public static List<FieldValueTypeInformation> sortBySchema(
+      List<FieldValueTypeInformation> types, Schema schema) {
+    Map<String, FieldValueTypeInformation> typeMap =
+        types
+            .stream()
+            .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
+    return schema
+        .getFields()
+        .stream()
+        .map(f -> typeMap.get(f.getName()))
+        .collect(Collectors.toList());
+  }
+
   enum MethodType {
     GETTER,
     SETTER
@@ -67,10 +81,10 @@ public class StaticSchemaInference {
    * public getter methods, or special annotations on the class.
    */
   public static Schema schemaFromClass(
-      Class<?> clazz, Function<Class, List<FieldValueTypeInformation>> getTypesForClass) {
+      Class<?> clazz, FieldValueTypeSupplier fieldValueTypeSupplier) {
     Schema.Builder builder = Schema.builder();
-    for (FieldValueTypeInformation type : getTypesForClass.apply(clazz)) {
-      Schema.FieldType fieldType = fieldFromType(type.getType(), getTypesForClass);
+    for (FieldValueTypeInformation type : fieldValueTypeSupplier.get(clazz)) {
+      Schema.FieldType fieldType = fieldFromType(type.getType(), fieldValueTypeSupplier);
       if (type.isNullable()) {
         builder.addNullableField(type.getName(), fieldType);
       } else {
@@ -82,7 +96,7 @@ public class StaticSchemaInference {
 
   // Map a Java field type to a Beam Schema FieldType.
   private static Schema.FieldType fieldFromType(
-      TypeDescriptor type, Function<Class, List<FieldValueTypeInformation>> getTypesForClass) {
+      TypeDescriptor type, FieldValueTypeSupplier fieldValueTypeSupplier) {
     FieldType primitiveType = PRIMITIVE_TYPES.get(type.getRawType());
     if (primitiveType != null) {
       return primitiveType;
@@ -95,7 +109,7 @@ public class StaticSchemaInference {
         return FieldType.BYTES;
       } else {
         // Otherwise this is an array type.
-        return FieldType.array(fieldFromType(component, getTypesForClass));
+        return FieldType.array(fieldFromType(component, fieldValueTypeSupplier));
       }
     } else if (type.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
       TypeDescriptor<Collection<?>> collection = type.getSupertype(Collection.class);
@@ -103,7 +117,7 @@ public class StaticSchemaInference {
         ParameterizedType ptype = (ParameterizedType) collection.getType();
         java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
         checkArgument(params.length == 1);
-        return FieldType.array(fieldFromType(TypeDescriptor.of(params[0]), getTypesForClass));
+        return FieldType.array(fieldFromType(TypeDescriptor.of(params[0]), fieldValueTypeSupplier));
       } else {
         throw new RuntimeException("Cannot infer schema from unparameterized collection.");
       }
@@ -113,8 +127,8 @@ public class StaticSchemaInference {
         ParameterizedType ptype = (ParameterizedType) map.getType();
         java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
         checkArgument(params.length == 2);
-        FieldType keyType = fieldFromType(TypeDescriptor.of(params[0]), getTypesForClass);
-        FieldType valueType = fieldFromType(TypeDescriptor.of(params[1]), getTypesForClass);
+        FieldType keyType = fieldFromType(TypeDescriptor.of(params[0]), fieldValueTypeSupplier);
+        FieldType valueType = fieldFromType(TypeDescriptor.of(params[1]), fieldValueTypeSupplier);
         checkArgument(
             keyType.getTypeName().isPrimitiveType(),
             "Only primitive types can be map keys. type: " + keyType.getTypeName());
@@ -129,7 +143,7 @@ public class StaticSchemaInference {
     } else if (type.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) {
       return FieldType.BYTES;
     } else {
-      return FieldType.row(schemaFromClass(type.getRawType(), getTypesForClass));
+      return FieldType.row(schemaFromClass(type.getRawType(), fieldValueTypeSupplier));
     }
   }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
index 16b5a5f..9e8cd64 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
@@ -37,15 +37,19 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.MismatchingNullableBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArraysBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedMapBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBeanWithAnnotations;
 import org.apache.beam.sdk.values.Row;
 import org.joda.time.DateTime;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /** Tests for the {@link JavaBeanSchema} schema provider. */
 public class JavaBeanSchemaTest {
@@ -67,6 +71,21 @@ public class JavaBeanSchemaTest {
         new StringBuilder(name).append("builder"));
   }
 
+  private SimpleBeanWithAnnotations createAnnotated(String name) {
+    return new SimpleBeanWithAnnotations(
+        name,
+        (byte) 1,
+        (short) 2,
+        3,
+        4L,
+        true,
+        DATE,
+        DATE.toInstant(),
+        BYTE_ARRAY,
+        BigDecimal.ONE,
+        new StringBuilder(name).append("builder"));
+  }
+
   private Row createSimpleRow(String name) {
     return Row.withSchema(SIMPLE_BEAN_SCHEMA)
         .addValues(
@@ -238,9 +257,9 @@ public class JavaBeanSchemaTest {
     NestedArrayBean bean = new NestedArrayBean(simple1, simple2, simple3);
     Row row = registry.getToRowFunction(NestedArrayBean.class).apply(bean);
     List<Row> rows = row.getArray("beans");
-    assertSame(simple1, registry.getFromRowFunction(NestedArrayBean.class).apply(rows.get(0)));
-    assertSame(simple2, registry.getFromRowFunction(NestedArrayBean.class).apply(rows.get(1)));
-    assertSame(simple3, registry.getFromRowFunction(NestedArrayBean.class).apply(rows.get(2)));
+    assertSame(simple1, registry.getFromRowFunction(SimpleBean.class).apply(rows.get(0)));
+    assertSame(simple2, registry.getFromRowFunction(SimpleBean.class).apply(rows.get(1)));
+    assertSame(simple3, registry.getFromRowFunction(SimpleBean.class).apply(rows.get(2)));
   }
 
   @Test
@@ -250,7 +269,6 @@ public class JavaBeanSchemaTest {
     Row row1 = createSimpleRow("string1");
     Row row2 = createSimpleRow("string2");
     Row row3 = createSimpleRow("string3");
-    ;
 
     Row row = Row.withSchema(NESTED_ARRAY_BEAN_SCHEMA).addArray(row1, row2, row3).build();
     NestedArrayBean bean = registry.getFromRowFunction(NestedArrayBean.class).apply(row);
@@ -334,4 +352,42 @@ public class JavaBeanSchemaTest {
     assertEquals("string2", bean.getMap().get("simple2").getStr());
     assertEquals("string3", bean.getMap().get("simple3").getStr());
   }
+
+  @Test
+  public void testAnnotations() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema schema = registry.getSchema(SimpleBeanWithAnnotations.class);
+    SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
+
+    SimpleBeanWithAnnotations pojo = createAnnotated("string");
+    Row row = registry.getToRowFunction(SimpleBeanWithAnnotations.class).apply(pojo);
+    assertEquals(12, row.getFieldCount());
+    assertEquals("string", row.getString("str"));
+    assertEquals((byte) 1, (Object) row.getByte("aByte"));
+    assertEquals((short) 2, (Object) row.getInt16("aShort"));
+    assertEquals((int) 3, (Object) row.getInt32("anInt"));
+    assertEquals((long) 4, (Object) row.getInt64("aLong"));
+    assertEquals(true, (Object) row.getBoolean("aBoolean"));
+    assertEquals(DATE.toInstant(), row.getDateTime("dateTime"));
+    assertEquals(DATE.toInstant(), row.getDateTime("instant"));
+    assertArrayEquals(BYTE_ARRAY, row.getBytes("bytes"));
+    assertArrayEquals(BYTE_ARRAY, row.getBytes("byteBuffer"));
+    assertEquals(BigDecimal.ONE, row.getDecimal("bigDecimal"));
+    assertEquals("stringbuilder", row.getString("stringBuilder"));
+
+    SimpleBeanWithAnnotations pojo2 =
+        registry
+            .getFromRowFunction(SimpleBeanWithAnnotations.class)
+            .apply(createSimpleRow("string"));
+    assertEquals(pojo, pojo2);
+  }
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testMismatchingNullable() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    thrown.expect(RuntimeException.class);
+    Schema schema = registry.getSchema(MismatchingNullableBean.class);
+  }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
index 87c42f2..31158bb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
@@ -41,6 +41,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.AnnotatedSimplePojo;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArraysPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedMapPOJO;
@@ -49,6 +50,7 @@ import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithNestedNullable;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithNullables;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.PrimitiveArrayPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.StaticCreationSimplePojo;
 import org.apache.beam.sdk.values.Row;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
@@ -78,6 +80,38 @@ public class JavaFieldSchemaTest {
         new StringBuilder(name).append("builder"));
   }
 
+  private AnnotatedSimplePojo createAnnotated(String name) {
+    return new AnnotatedSimplePojo(
+        name,
+        (byte) 1,
+        4L,
+        (short) 2,
+        3,
+        true,
+        DATE,
+        BigDecimal.ONE,
+        INSTANT,
+        BYTE_ARRAY,
+        BYTE_BUFFER,
+        new StringBuilder(name).append("builder"));
+  }
+
+  private StaticCreationSimplePojo createStaticCreation(String name) {
+    return StaticCreationSimplePojo.of(
+        name,
+        4L,
+        (byte) 1,
+        (short) 2,
+        3,
+        true,
+        DATE,
+        BYTE_BUFFER,
+        INSTANT,
+        BYTE_ARRAY,
+        BigDecimal.ONE,
+        new StringBuilder(name).append("builder"));
+  }
+
   private Row createSimpleRow(String name) {
     return Row.withSchema(SIMPLE_POJO_SCHEMA)
         .addValues(
@@ -249,9 +283,9 @@ public class JavaFieldSchemaTest {
     NestedArrayPOJO pojo = new NestedArrayPOJO(simple1, simple2, simple3);
     Row row = registry.getToRowFunction(NestedArrayPOJO.class).apply(pojo);
     List<Row> rows = row.getArray("pojos");
-    assertSame(simple1, registry.getFromRowFunction(NestedArrayPOJO.class).apply(rows.get(0)));
-    assertSame(simple2, registry.getFromRowFunction(NestedArrayPOJO.class).apply(rows.get(1)));
-    assertSame(simple3, registry.getFromRowFunction(NestedArrayPOJO.class).apply(rows.get(2)));
+    assertSame(simple1, registry.getFromRowFunction(SimplePOJO.class).apply(rows.get(0)));
+    assertSame(simple2, registry.getFromRowFunction(SimplePOJO.class).apply(rows.get(1)));
+    assertSame(simple3, registry.getFromRowFunction(SimplePOJO.class).apply(rows.get(2)));
   }
 
   @Test
@@ -386,4 +420,34 @@ public class JavaFieldSchemaTest {
         registry.getFromRowFunction(POJOWithNestedNullable.class).apply(row);
     assertNull(pojo.nested);
   }
+
+  @Test
+  public void testAnnotations() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema schema = registry.getSchema(AnnotatedSimplePojo.class);
+    SchemaTestUtils.assertSchemaEquivalent(SIMPLE_POJO_SCHEMA, schema);
+
+    Row simpleRow = createSimpleRow("string");
+    AnnotatedSimplePojo pojo = createAnnotated("string");
+    assertEquals(simpleRow, registry.getToRowFunction(AnnotatedSimplePojo.class).apply(pojo));
+
+    AnnotatedSimplePojo pojo2 =
+        registry.getFromRowFunction(AnnotatedSimplePojo.class).apply(simpleRow);
+    assertEquals(pojo, pojo2);
+  }
+
+  @Test
+  public void testStaticCreator() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema schema = registry.getSchema(StaticCreationSimplePojo.class);
+    SchemaTestUtils.assertSchemaEquivalent(SIMPLE_POJO_SCHEMA, schema);
+
+    Row simpleRow = createSimpleRow("string");
+    StaticCreationSimplePojo pojo = createStaticCreation("string");
+    assertEquals(simpleRow, registry.getToRowFunction(StaticCreationSimplePojo.class).apply(pojo));
+
+    StaticCreationSimplePojo pojo2 =
+        registry.getFromRowFunction(StaticCreationSimplePojo.class).apply(simpleRow);
+    assertEquals(pojo, pojo2);
+  }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java
index 2610873..195e66a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
 import java.util.List;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -97,7 +98,8 @@ public class SchemaRegistryTest {
     tryGetters(registry);
   }
 
-  static final class Provider implements SchemaProvider {
+  /** A test SchemaProvider. */
+  public static final class Provider implements SchemaProvider {
     @Override
     public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
       if (typeDescriptor.equals(TypeDescriptors.strings())) {
@@ -174,7 +176,8 @@ public class SchemaRegistryTest {
   @DefaultSchema(TestDefaultSchemaProvider.class)
   static class TestDefaultSchemaClass {}
 
-  static final class TestDefaultSchemaProvider implements SchemaProvider {
+  /** A test schema provider. */
+  public static final class TestDefaultSchemaProvider implements SchemaProvider {
     @Override
     public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
       if (typeDescriptor.equals(TypeDescriptor.of(TestDefaultSchemaClass.class))) {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CastTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CastTest.java
index aad6f83..3dab794 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CastTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CastTest.java
@@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.Objects;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
index 4875850..762152f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
@@ -22,10 +22,10 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/FilterTest.java
index c1f7993..0a0215c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/FilterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/FilterTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.schemas.transforms;
 
 import com.google.common.collect.Lists;
 import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
index 4b2e323..d15e29c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
@@ -31,11 +31,11 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
index 718285a..d24ecfd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
@@ -18,9 +18,9 @@
 package org.apache.beam.sdk.schemas.transforms;
 
 import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
index c835ea6..5467483 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
@@ -38,11 +38,11 @@ import java.util.List;
 import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.FieldValueSetter;
 import org.apache.beam.sdk.schemas.JavaBeanSchema;
+import org.apache.beam.sdk.schemas.JavaBeanSchema.GetterTypeSupplier;
 import org.apache.beam.sdk.schemas.JavaBeanSchema.SetterTypeSupplier;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithBoxedFields;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithByteArray;
-import org.apache.beam.sdk.schemas.utils.TestJavaBeans.MismatchingNullableBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedCollectionBean;
@@ -52,66 +52,66 @@ import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveMapBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
 import org.joda.time.DateTime;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 /** Tests for the {@link JavaBeanUtils} class. */
 public class JavaBeanUtilsTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
   @Test
   public void testNullable() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class, GetterTypeSupplier.INSTANCE);
     assertTrue(schema.getField("str").getType().getNullable());
     assertFalse(schema.getField("anInt").getType().getNullable());
   }
 
   @Test
-  public void testMismatchingNullable() {
-    thrown.expect(RuntimeException.class);
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(MismatchingNullableBean.class);
-  }
-
-  @Test
   public void testSimpleBean() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class, GetterTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedBean() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class, GetterTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testPrimitiveArray() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveArrayBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(
+            PrimitiveArrayBean.class, GetterTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedArray() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class, GetterTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedCollection() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedCollectionBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(
+            NestedCollectionBean.class, GetterTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testPrimitiveMap() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class, GetterTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedMap() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class, GetterTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_BEAN_SCHEMA, schema);
   }
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
index e140a8b..697000c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
@@ -63,50 +63,56 @@ public class POJOUtilsTest {
 
   @Test
   public void testNullables() {
-    Schema schema = POJOUtils.schemaFromPojoClass(POJOWithNullables.class);
+    Schema schema =
+        POJOUtils.schemaFromPojoClass(POJOWithNullables.class, JavaFieldTypeSupplier.INSTANCE);
     assertTrue(schema.getField("str").getType().getNullable());
     assertFalse(schema.getField("anInt").getType().getNullable());
   }
 
   @Test
   public void testSimplePOJO() {
-    Schema schema = POJOUtils.schemaFromPojoClass(SimplePOJO.class);
+    Schema schema = POJOUtils.schemaFromPojoClass(SimplePOJO.class, JavaFieldTypeSupplier.INSTANCE);
     assertEquals(SIMPLE_POJO_SCHEMA, schema);
   }
 
   @Test
   public void testNestedPOJO() {
-    Schema schema = POJOUtils.schemaFromPojoClass(NestedPOJO.class);
+    Schema schema = POJOUtils.schemaFromPojoClass(NestedPOJO.class, JavaFieldTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_POJO_SCHEMA, schema);
   }
 
   @Test
   public void testPrimitiveArray() {
-    Schema schema = POJOUtils.schemaFromPojoClass(PrimitiveArrayPOJO.class);
+    Schema schema =
+        POJOUtils.schemaFromPojoClass(PrimitiveArrayPOJO.class, JavaFieldTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_POJO_SCHEMA, schema);
   }
 
   @Test
   public void testNestedArray() {
-    Schema schema = POJOUtils.schemaFromPojoClass(NestedArrayPOJO.class);
+    Schema schema =
+        POJOUtils.schemaFromPojoClass(NestedArrayPOJO.class, JavaFieldTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_POJO_SCHEMA, schema);
   }
 
   @Test
   public void testNestedCollection() {
-    Schema schema = POJOUtils.schemaFromPojoClass(NestedCollectionPOJO.class);
+    Schema schema =
+        POJOUtils.schemaFromPojoClass(NestedCollectionPOJO.class, JavaFieldTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_POJO_SCHEMA, schema);
   }
 
   @Test
   public void testPrimitiveMap() {
-    Schema schema = POJOUtils.schemaFromPojoClass(PrimitiveMapPOJO.class);
+    Schema schema =
+        POJOUtils.schemaFromPojoClass(PrimitiveMapPOJO.class, JavaFieldTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_POJO_SCHEMA, schema);
   }
 
   @Test
   public void testNestedMap() {
-    Schema schema = POJOUtils.schemaFromPojoClass(NestedMapPOJO.class);
+    Schema schema =
+        POJOUtils.schemaFromPojoClass(NestedMapPOJO.class, JavaFieldTypeSupplier.INSTANCE);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_POJO_SCHEMA, schema);
   }
 
@@ -128,7 +134,7 @@ public class POJOUtilsTest {
             new StringBuilder("stringBuilder"));
 
     List<FieldValueGetter> getters =
-        POJOUtils.getGetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new JavaFieldTypeSupplier());
+        POJOUtils.getGetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, JavaFieldTypeSupplier.INSTANCE);
     assertEquals(12, getters.size());
     assertEquals("str", getters.get(0).name());
     assertEquals("field1", getters.get(0).get(simplePojo));
@@ -150,7 +156,7 @@ public class POJOUtilsTest {
   public void testGeneratedSimpleSetters() {
     SimplePOJO simplePojo = new SimplePOJO();
     List<FieldValueSetter> setters =
-        POJOUtils.getSetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new JavaFieldTypeSupplier());
+        POJOUtils.getSetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, JavaFieldTypeSupplier.INSTANCE);
     assertEquals(12, setters.size());
 
     setters.get(0).set(simplePojo, "field1");
@@ -186,7 +192,9 @@ public class POJOUtilsTest {
 
     List<FieldValueGetter> getters =
         POJOUtils.getGetters(
-            POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new JavaFieldTypeSupplier());
+            POJOWithBoxedFields.class,
+            POJO_WITH_BOXED_FIELDS_SCHEMA,
+            JavaFieldTypeSupplier.INSTANCE);
     assertEquals((byte) 41, getters.get(0).get(pojo));
     assertEquals((short) 42, getters.get(1).get(pojo));
     assertEquals((int) 43, getters.get(2).get(pojo));
@@ -199,7 +207,9 @@ public class POJOUtilsTest {
     POJOWithBoxedFields pojo = new POJOWithBoxedFields();
     List<FieldValueSetter> setters =
         POJOUtils.getSetters(
-            POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new JavaFieldTypeSupplier());
+            POJOWithBoxedFields.class,
+            POJO_WITH_BOXED_FIELDS_SCHEMA,
+            JavaFieldTypeSupplier.INSTANCE);
 
     setters.get(0).set(pojo, (byte) 41);
     setters.get(1).set(pojo, (short) 42);
@@ -219,7 +229,7 @@ public class POJOUtilsTest {
     POJOWithByteArray pojo = new POJOWithByteArray();
     List<FieldValueSetter> setters =
         POJOUtils.getSetters(
-            POJOWithByteArray.class, POJO_WITH_BYTE_ARRAY_SCHEMA, new JavaFieldTypeSupplier());
+            POJOWithByteArray.class, POJO_WITH_BYTE_ARRAY_SCHEMA, JavaFieldTypeSupplier.INSTANCE);
     setters.get(0).set(pojo, BYTE_ARRAY);
     setters.get(1).set(pojo, BYTE_BUFFER.array());
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
index e77adf7..e73d565 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
@@ -24,10 +24,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaBeanSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 
@@ -310,6 +313,147 @@ public class TestJavaBeans {
           .addStringField("stringBuilder")
           .build();
 
+  /** A simple Bean containing basic types. * */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class SimpleBeanWithAnnotations {
+    private final String str;
+    private final byte aByte;
+    private final short aShort;
+    private final int anInt;
+    private final long aLong;
+    private final boolean aBoolean;
+    private final DateTime dateTime;
+    private final Instant instant;
+    private final byte[] bytes;
+    private final ByteBuffer byteBuffer;
+    private final BigDecimal bigDecimal;
+    private final StringBuilder stringBuilder;
+
+    @SchemaCreate
+    public SimpleBeanWithAnnotations(
+        String str,
+        byte aByte,
+        short aShort,
+        int anInt,
+        long aLong,
+        boolean aBoolean,
+        DateTime dateTime,
+        Instant instant,
+        byte[] bytes,
+        BigDecimal bigDecimal,
+        StringBuilder stringBuilder) {
+      this.str = str;
+      this.aByte = aByte;
+      this.aShort = aShort;
+      this.anInt = anInt;
+      this.aLong = aLong;
+      this.aBoolean = aBoolean;
+      this.dateTime = dateTime;
+      this.instant = instant;
+      this.bytes = bytes;
+      this.byteBuffer = ByteBuffer.wrap(bytes);
+      this.bigDecimal = bigDecimal;
+      this.stringBuilder = stringBuilder;
+    }
+
+    @SchemaIgnore
+    public String getUnknown() {
+      return "";
+    }
+
+    public String getStr() {
+      return str;
+    }
+
+    @SchemaFieldName("aByte")
+    public byte getTheByteByte() {
+      return aByte;
+    }
+
+    @SchemaFieldName("aShort")
+    public short getNotAShort() {
+      return aShort;
+    }
+
+    public int getAnInt() {
+      return anInt;
+    }
+
+    public long getaLong() {
+      return aLong;
+    }
+
+    public boolean isaBoolean() {
+      return aBoolean;
+    }
+
+    public DateTime getDateTime() {
+      return dateTime;
+    }
+
+    public byte[] getBytes() {
+      return bytes;
+    }
+
+    public ByteBuffer getByteBuffer() {
+      return byteBuffer;
+    }
+
+    public Instant getInstant() {
+      return instant;
+    }
+
+    public BigDecimal getBigDecimal() {
+      return bigDecimal;
+    }
+
+    public StringBuilder getStringBuilder() {
+      return stringBuilder;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SimpleBeanWithAnnotations that = (SimpleBeanWithAnnotations) o;
+      return aByte == that.aByte
+          && aShort == that.aShort
+          && anInt == that.anInt
+          && aLong == that.aLong
+          && aBoolean == that.aBoolean
+          && Objects.equals(str, that.str)
+          && Objects.equals(dateTime, that.dateTime)
+          && Objects.equals(instant, that.instant)
+          && Arrays.equals(bytes, that.bytes)
+          && Objects.equals(byteBuffer, that.byteBuffer)
+          && Objects.equals(bigDecimal, that.bigDecimal)
+          && Objects.equals(stringBuilder.toString(), that.stringBuilder.toString());
+    }
+
+    @Override
+    public int hashCode() {
+      int result =
+          Objects.hash(
+              str,
+              aByte,
+              aShort,
+              anInt,
+              aLong,
+              aBoolean,
+              dateTime,
+              instant,
+              byteBuffer,
+              bigDecimal,
+              stringBuilder.toString());
+      result = 31 * result + Arrays.hashCode(bytes);
+      return result;
+    }
+  }
+
   /** A Bean containing a nested class. * */
   @DefaultSchema(JavaBeanSchema.class)
   public static class NestedBean {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
index f985980..0e2a2f4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
@@ -24,10 +24,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 
@@ -101,6 +104,249 @@ public class TestPOJOs {
   public static final Schema NESTED_NULLABLE_SCHEMA =
       Schema.builder().addNullableField("nested", FieldType.row(NULLABLES_SCHEMA)).build();
 
+  /** A POJO for testing static factory methods. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class StaticCreationSimplePojo {
+    public final String str;
+    public final byte aByte;
+    public final short aShort;
+    public final int anInt;
+    public final long aLong;
+    public final boolean aBoolean;
+    public final DateTime dateTime;
+    public final Instant instant;
+    public final byte[] bytes;
+    public final ByteBuffer byteBuffer;
+    public final BigDecimal bigDecimal;
+    public final StringBuilder stringBuilder;
+
+    private StaticCreationSimplePojo(
+        String str,
+        byte aByte,
+        short aShort,
+        int anInt,
+        long aLong,
+        boolean aBoolean,
+        DateTime dateTime,
+        Instant instant,
+        byte[] bytes,
+        ByteBuffer byteBuffer,
+        BigDecimal bigDecimal,
+        StringBuilder stringBuilder) {
+      this.str = str;
+      this.aByte = aByte;
+      this.aShort = aShort;
+      this.anInt = anInt;
+      this.aLong = aLong;
+      this.aBoolean = aBoolean;
+      this.dateTime = dateTime;
+      this.instant = instant;
+      this.bytes = bytes;
+      this.byteBuffer = byteBuffer;
+      this.bigDecimal = bigDecimal;
+      this.stringBuilder = stringBuilder;
+    }
+
+    @SchemaCreate
+    public static StaticCreationSimplePojo of(
+        String str,
+        long aLong,
+        byte aByte,
+        short aShort,
+        int anInt,
+        boolean aBoolean,
+        DateTime dateTime,
+        ByteBuffer byteBuffer,
+        Instant instant,
+        byte[] bytes,
+        BigDecimal bigDecimal,
+        StringBuilder stringBuilder) {
+      return new StaticCreationSimplePojo(
+          str,
+          aByte,
+          aShort,
+          anInt,
+          aLong,
+          aBoolean,
+          dateTime,
+          instant,
+          bytes,
+          byteBuffer,
+          bigDecimal,
+          stringBuilder);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof StaticCreationSimplePojo)) {
+        return false;
+      }
+      StaticCreationSimplePojo that = (StaticCreationSimplePojo) o;
+      return aByte == that.aByte
+          && aShort == that.aShort
+          && anInt == that.anInt
+          && aLong == that.aLong
+          && aBoolean == that.aBoolean
+          && Objects.equals(str, that.str)
+          && Objects.equals(dateTime, that.dateTime)
+          && Objects.equals(instant, that.instant)
+          && Arrays.equals(bytes, that.bytes)
+          && Objects.equals(byteBuffer, that.byteBuffer)
+          && Objects.equals(bigDecimal, that.bigDecimal)
+          && Objects.equals(stringBuilder.toString(), that.stringBuilder.toString());
+    }
+
+    @Override
+    public int hashCode() {
+      int result =
+          Objects.hash(
+              str,
+              aByte,
+              aShort,
+              anInt,
+              aLong,
+              aBoolean,
+              dateTime,
+              instant,
+              byteBuffer,
+              bigDecimal,
+              stringBuilder);
+      result = 31 * result + Arrays.hashCode(bytes);
+      return result;
+    }
+  }
+
+  /** A POJO for testing annotations. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class AnnotatedSimplePojo {
+    public final String str;
+
+    @SchemaFieldName("aByte")
+    public final byte theByte;
+
+    @SchemaFieldName("aShort")
+    public final short theShort;
+
+    public final int anInt;
+    public final long aLong;
+    public final boolean aBoolean;
+    public final DateTime dateTime;
+    public final Instant instant;
+    public final byte[] bytes;
+    public final ByteBuffer byteBuffer;
+    public final BigDecimal bigDecimal;
+    public final StringBuilder stringBuilder;
+    @SchemaIgnore public final Integer pleaseIgnore;
+
+    // Marked with SchemaCreate, so this will be called to construct instances.
+    @SchemaCreate
+    public AnnotatedSimplePojo(
+        String str,
+        byte theByte,
+        long aLong,
+        short theShort,
+        int anInt,
+        boolean aBoolean,
+        DateTime dateTime,
+        BigDecimal bigDecimal,
+        Instant instant,
+        byte[] bytes,
+        ByteBuffer byteBuffer,
+        StringBuilder stringBuilder) {
+      this.str = str;
+      this.theByte = theByte;
+      this.theShort = theShort;
+      this.anInt = anInt;
+      this.aLong = aLong;
+      this.aBoolean = aBoolean;
+      this.dateTime = dateTime;
+      this.instant = instant;
+      this.bytes = bytes;
+      this.byteBuffer = byteBuffer;
+      this.bigDecimal = bigDecimal;
+      this.stringBuilder = stringBuilder;
+      this.pleaseIgnore = 42;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      AnnotatedSimplePojo that = (AnnotatedSimplePojo) o;
+      return theByte == that.theByte
+          && theShort == that.theShort
+          && anInt == that.anInt
+          && aLong == that.aLong
+          && aBoolean == that.aBoolean
+          && Objects.equals(str, that.str)
+          && Objects.equals(dateTime, that.dateTime)
+          && Objects.equals(instant, that.instant)
+          && Arrays.equals(bytes, that.bytes)
+          && Objects.equals(byteBuffer, that.byteBuffer)
+          && Objects.equals(bigDecimal, that.bigDecimal)
+          && Objects.equals(stringBuilder.toString(), that.stringBuilder.toString());
+    }
+
+    @Override
+    public int hashCode() {
+      int result =
+          Objects.hash(
+              str,
+              theByte,
+              theShort,
+              anInt,
+              aLong,
+              aBoolean,
+              dateTime,
+              instant,
+              byteBuffer,
+              bigDecimal,
+              stringBuilder.toString());
+      result = 31 * result + Arrays.hashCode(bytes);
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "AnnotatedSimplePojo{"
+          + "str='"
+          + str
+          + '\''
+          + ", theByte="
+          + theByte
+          + ", theShort="
+          + theShort
+          + ", anInt="
+          + anInt
+          + ", aLong="
+          + aLong
+          + ", aBoolean="
+          + aBoolean
+          + ", dateTime="
+          + dateTime
+          + ", instant="
+          + instant
+          + ", bytes="
+          + Arrays.toString(bytes)
+          + ", byteBuffer="
+          + byteBuffer
+          + ", bigDecimal="
+          + bigDecimal
+          + ", stringBuilder="
+          + stringBuilder
+          + ", pleaseIgnore="
+          + pleaseIgnore
+          + '}';
+    }
+  }
+
   /** A simple POJO containing basic types. * */
   @DefaultSchema(JavaFieldSchema.class)
   public static class SimplePOJO {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
index dd843a0..60dc9cd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
@@ -22,10 +22,10 @@ import static org.junit.Assert.assertTrue;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.List;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesSchema;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Customer.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Customer.java
index b471dc8..8aa1880 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Customer.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Customer.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.extensions.sql.example.model;
 
 import java.io.Serializable;
 import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaBeanSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 
 /** Describes a customer. */
 @DefaultSchema(JavaBeanSchema.class)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Order.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Order.java
index 6564732..999b2b8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Order.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Order.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.extensions.sql.example.model;
 
 import java.io.Serializable;
 import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaBeanSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 
 /** Describes an order. */
 @DefaultSchema(JavaBeanSchema.class)
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredJavaBeanSqlTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredJavaBeanSqlTest.java
index 829469a..7fc3d98 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredJavaBeanSqlTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredJavaBeanSqlTest.java
@@ -21,9 +21,9 @@ import static org.apache.beam.sdk.extensions.sql.TestUtils.tuple;
 
 import java.io.Serializable;
 import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaBeanSchema;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
index 5104f11..fe87fc9 100644
--- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
+++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
@@ -23,10 +23,10 @@ import java.sql.ResultSet;
 import java.util.Arrays;
 import java.util.Objects;
 import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
index 326154c..849c2f2 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.joda.time.Instant;
 
 /** An auction submitted by a person. */
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
index 35092d7..d06e3bd 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
@@ -29,8 +29,8 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 
 /** Result of Query5. */
 @DefaultSchema(JavaFieldSchema.class)
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
index 0a6c934..62e3386 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
@@ -29,8 +29,8 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 
 /** Result of Query2. */
 @DefaultSchema(JavaFieldSchema.class)
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
index 18feea0..217bcff 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
@@ -32,8 +32,8 @@ import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.joda.time.Instant;
 
 /** A bid for an item on auction. */
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
index 70f2285..6013d83 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
@@ -26,8 +26,8 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 
 /**
  * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, or a
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
index fe3a736..c985707 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
@@ -30,8 +30,8 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 
 /** Result of Query3. */
 @DefaultSchema(JavaFieldSchema.class)
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
index c298e3f..2559f6b 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.joda.time.Instant;
 
 /** A person either creating an auction or making a bid. */