You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2019/01/08 18:06:46 UTC
[beam] branch master updated: Merge pull request #7289: [BEAM-6240]
Add a library of schema annotations for POJO and JavaBeans
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c379622 Merge pull request #7289: [BEAM-6240] Add a library of schema annotations for POJO and JavaBeans
c379622 is described below
commit c37962211fd12ad6aeada49f8eaf9b3911081568
Author: reuvenlax <re...@google.com>
AuthorDate: Tue Jan 8 10:06:37 2019 -0800
Merge pull request #7289: [BEAM-6240] Add a library of schema annotations for POJO and JavaBeans
---
.../runners/gearpump/GearpumpPipelineOptions.java | 4 +-
.../runners/gearpump/GearpumpPipelineResult.java | 2 +-
.../beam/runners/gearpump/GearpumpRunner.java | 3 +-
.../beam/sdk/schemas/FromRowUsingCreator.java | 10 +-
.../apache/beam/sdk/schemas/JavaBeanSchema.java | 88 +++++---
.../apache/beam/sdk/schemas/JavaFieldSchema.java | 76 +++++--
.../apache/beam/sdk/schemas/SchemaRegistry.java | 1 +
.../schemas/{ => annotations}/DefaultSchema.java | 7 +-
.../beam/sdk/schemas/annotations/SchemaCreate.java | 74 ++++++
.../sdk/schemas/annotations/SchemaFieldName.java | 54 +++++
.../beam/sdk/schemas/annotations/SchemaIgnore.java | 51 +++++
.../package-info.java} | 25 +--
.../apache/beam/sdk/schemas/utils/AvroUtils.java | 26 +--
.../beam/sdk/schemas/utils/ByteBuddyUtils.java | 178 +++++++++++++++
.../sdk/schemas/utils/FieldValueTypeSupplier.java | 11 +-
.../beam/sdk/schemas/utils/JavaBeanUtils.java | 112 ++++++++--
.../apache/beam/sdk/schemas/utils/POJOUtils.java | 102 +++++++--
.../beam/sdk/schemas/utils/ReflectUtils.java | 39 ++++
.../sdk/schemas/utils/StaticSchemaInference.java | 32 ++-
.../beam/sdk/schemas/JavaBeanSchemaTest.java | 64 +++++-
.../beam/sdk/schemas/JavaFieldSchemaTest.java | 70 +++++-
.../beam/sdk/schemas/SchemaRegistryTest.java | 7 +-
.../beam/sdk/schemas/transforms/CastTest.java | 2 +-
.../beam/sdk/schemas/transforms/ConvertTest.java | 2 +-
.../beam/sdk/schemas/transforms/FilterTest.java | 2 +-
.../beam/sdk/schemas/transforms/GroupTest.java | 2 +-
.../beam/sdk/schemas/transforms/SelectTest.java | 2 +-
.../beam/sdk/schemas/utils/JavaBeanUtilsTest.java | 38 ++--
.../beam/sdk/schemas/utils/POJOUtilsTest.java | 36 +--
.../beam/sdk/schemas/utils/TestJavaBeans.java | 146 +++++++++++-
.../apache/beam/sdk/schemas/utils/TestPOJOs.java | 248 ++++++++++++++++++++-
.../beam/sdk/transforms/ParDoSchemaTest.java | 2 +-
.../sdk/extensions/sql/example/model/Customer.java | 2 +-
.../sdk/extensions/sql/example/model/Order.java | 2 +-
.../extensions/sql/InferredJavaBeanSqlTest.java | 2 +-
.../beam/sdk/io/clickhouse/ClickHouseIOTest.java | 2 +-
.../org/apache/beam/sdk/nexmark/model/Auction.java | 2 +-
.../beam/sdk/nexmark/model/AuctionCount.java | 2 +-
.../beam/sdk/nexmark/model/AuctionPrice.java | 2 +-
.../org/apache/beam/sdk/nexmark/model/Bid.java | 2 +-
.../org/apache/beam/sdk/nexmark/model/Event.java | 2 +-
.../beam/sdk/nexmark/model/NameCityStateId.java | 2 +-
.../org/apache/beam/sdk/nexmark/model/Person.java | 2 +-
43 files changed, 1346 insertions(+), 192 deletions(-)
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
index eadfc44..28ff8a4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
-
/** Options that configure the Gearpump pipeline. */
public interface GearpumpPipelineOptions extends PipelineOptions {
@@ -45,7 +44,8 @@ public interface GearpumpPipelineOptions extends PipelineOptions {
@JsonIgnore
Map<String, String> getSerializers();
- @Description("Whether the pipeline will be run on a remote cluster. If false, it will be run on a EmbeddedCluster")
+ @Description(
+ "Whether the pipeline will be run on a remote cluster. If false, it will be run on a EmbeddedCluster")
void setRemote(Boolean remote);
@Default.Boolean(true)
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 7f9b0be..7e6a4c3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -1,4 +1,4 @@
- /*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 3181b9c..2aca2ca 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -70,7 +70,8 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment());
} else {
RuntimeEnvironment.setRuntimeEnv(new EmbeddedRuntimeEnvironment());
- config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
+ config =
+ config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
}
ClientContext clientContext = ClientContext.apply(config);
options.setClientContext(clientContext);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
index 139281f..79f5afe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.schemas;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.Lists;
@@ -55,8 +56,11 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
public <ValueT> ValueT fromRow(
Row row, Class<ValueT> clazz, Factory<List<FieldValueTypeInformation>> typeFactory) {
if (row instanceof RowWithGetters) {
- // Efficient path: simply extract the underlying object instead of creating a new one.
- return (ValueT) ((RowWithGetters) row).getGetterTarget();
+ Object target = ((RowWithGetters) row).getGetterTarget();
+ if (target.getClass().equals(clazz)) {
+ // Efficient path: simply extract the underlying object instead of creating a new one.
+ return (ValueT) target;
+ }
}
Object[] params = new Object[row.getFieldCount()];
@@ -68,7 +72,7 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
for (int i = 0; i < row.getFieldCount(); ++i) {
FieldType type = schema.getField(i).getType();
- FieldValueTypeInformation typeInformation = typeInformations.get(i);
+ FieldValueTypeInformation typeInformation = checkNotNull(typeInformations.get(i));
params[i] =
fromValue(
type,
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
index 8eb8022..b7e9dda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
@@ -18,12 +18,14 @@
package org.apache.beam.sdk.schemas;
import com.google.common.annotations.VisibleForTesting;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
import org.apache.beam.sdk.schemas.utils.ReflectUtils;
@@ -47,19 +49,20 @@ public class JavaBeanSchema extends GetterBasedSchemaProvider {
/** {@link FieldValueTypeSupplier} that's based on getter methods. */
@VisibleForTesting
public static class GetterTypeSupplier implements FieldValueTypeSupplier {
+ public static final GetterTypeSupplier INSTANCE = new GetterTypeSupplier();
+
@Override
- public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
- Map<String, FieldValueTypeInformation> types =
- ReflectUtils.getMethods(clazz)
- .stream()
- .filter(ReflectUtils::isGetter)
- .map(FieldValueTypeInformation::forGetter)
- .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
- // Return the list ordered by the schema fields.
- return schema
- .getFields()
+ public List<FieldValueTypeInformation> get(Class<?> clazz) {
+ return ReflectUtils.getMethods(clazz)
.stream()
- .map(f -> types.get(f.getName()))
+ .filter(ReflectUtils::isGetter)
+ .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
+ .map(FieldValueTypeInformation::forGetter)
+ .map(
+ t -> {
+ SchemaFieldName fieldName = t.getMethod().getAnnotation(SchemaFieldName.class);
+ return (fieldName != null) ? t.withName(fieldName.value()) : t;
+ })
.collect(Collectors.toList());
}
}
@@ -67,50 +70,77 @@ public class JavaBeanSchema extends GetterBasedSchemaProvider {
/** {@link FieldValueTypeSupplier} that's based on setter methods. */
@VisibleForTesting
public static class SetterTypeSupplier implements FieldValueTypeSupplier {
+ private static final SetterTypeSupplier INSTANCE = new SetterTypeSupplier();
+
@Override
- public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
- Map<String, FieldValueTypeInformation> types =
- ReflectUtils.getMethods(clazz)
- .stream()
- .filter(ReflectUtils::isSetter)
- .map(FieldValueTypeInformation::forSetter)
- .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
- // Return the list ordered by the schema fields.
- return schema
- .getFields()
+ public List<FieldValueTypeInformation> get(Class<?> clazz) {
+ return ReflectUtils.getMethods(clazz)
.stream()
- .map(f -> types.get(f.getName()))
+ .filter(ReflectUtils::isSetter)
+ .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
+ .map(FieldValueTypeInformation::forSetter)
.collect(Collectors.toList());
}
}
@Override
public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
- return JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor.getRawType());
+ Schema schema =
+ JavaBeanUtils.schemaFromJavaBeanClass(
+ typeDescriptor.getRawType(), GetterTypeSupplier.INSTANCE);
+
+ // If there are no creator methods, then validate that we have setters for every field.
+ // Otherwise, we will have not way of creating the class.
+ if (ReflectUtils.getAnnotatedCreateMethod(typeDescriptor.getRawType()) == null
+ && ReflectUtils.getAnnotatedConstructor(typeDescriptor.getRawType()) == null) {
+ JavaBeanUtils.validateJavaBean(
+ GetterTypeSupplier.INSTANCE.get(typeDescriptor.getRawType(), schema),
+ SetterTypeSupplier.INSTANCE.get(typeDescriptor.getRawType(), schema));
+ }
+ return schema;
}
@Override
public FieldValueGetterFactory fieldValueGetterFactory() {
return (Class<?> targetClass, Schema schema) ->
- JavaBeanUtils.getGetters(targetClass, schema, new GetterTypeSupplier());
+ JavaBeanUtils.getGetters(targetClass, schema, GetterTypeSupplier.INSTANCE);
}
@Override
UserTypeCreatorFactory schemaTypeCreatorFactory() {
- return new SetterBasedCreatorFactory(new JavaBeanSetterFactory());
+ UserTypeCreatorFactory setterBasedFactory =
+ new SetterBasedCreatorFactory(new JavaBeanSetterFactory());
+
+ return (Class<?> targetClass, Schema schema) -> {
+ // If a static method is marked with @SchemaCreate, use that.
+ Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetClass);
+ if (annotated != null) {
+ return JavaBeanUtils.getStaticCreator(
+ targetClass, annotated, schema, GetterTypeSupplier.INSTANCE);
+ }
+
+ // If a Constructor was tagged with @SchemaCreate, invoke that constructor.
+ Constructor<?> constructor = ReflectUtils.getAnnotatedConstructor(targetClass);
+ if (constructor != null) {
+ return JavaBeanUtils.getConstructorCreator(
+ targetClass, constructor, schema, GetterTypeSupplier.INSTANCE);
+ }
+
+ return setterBasedFactory.create(targetClass, schema);
+ };
}
@Override
public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
return (Class<?> targetClass, Schema schema) ->
- JavaBeanUtils.getFieldTypes(targetClass, schema, new GetterTypeSupplier());
+ JavaBeanUtils.getFieldTypes(targetClass, schema, GetterTypeSupplier.INSTANCE);
}
/** A factory for creating {@link FieldValueSetter} objects for a JavaBean object. */
public static class JavaBeanSetterFactory implements FieldValueSetterFactory {
@Override
public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) {
- return JavaBeanUtils.getSetters(targetClass, schema, new SetterTypeSupplier());
+ return JavaBeanUtils.getSetters(targetClass, schema, SetterTypeSupplier.INSTANCE);
}
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
index 1504717..73e8dac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
@@ -18,12 +18,17 @@
package org.apache.beam.sdk.schemas;
import com.google.common.annotations.VisibleForTesting;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
+import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
import org.apache.beam.sdk.schemas.utils.POJOUtils;
import org.apache.beam.sdk.schemas.utils.ReflectUtils;
@@ -47,42 +52,81 @@ public class JavaFieldSchema extends GetterBasedSchemaProvider {
/** {@link FieldValueTypeSupplier} that's based on public fields. */
@VisibleForTesting
public static class JavaFieldTypeSupplier implements FieldValueTypeSupplier {
+ public static final JavaFieldTypeSupplier INSTANCE = new JavaFieldTypeSupplier();
+
@Override
- public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
- Map<String, FieldValueTypeInformation> types =
+ public List<FieldValueTypeInformation> get(Class<?> clazz) {
+ List<FieldValueTypeInformation> types =
ReflectUtils.getFields(clazz)
.stream()
+ .filter(f -> !f.isAnnotationPresent(SchemaIgnore.class))
.map(FieldValueTypeInformation::forField)
- .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
- // Return the list ordered by the schema fields.
- return schema
- .getFields()
- .stream()
- .map(f -> types.get(f.getName()))
- .collect(Collectors.toList());
+ .map(
+ t -> {
+ SchemaFieldName fieldName = t.getField().getAnnotation(SchemaFieldName.class);
+ return (fieldName != null) ? t.withName(fieldName.value()) : t;
+ })
+ .collect(Collectors.toList());
+
+ // If there are no creators registered, then make sure none of the schema fields are final,
+ // as we (currently) have no way of creating classes in this case.
+ if (ReflectUtils.getAnnotatedCreateMethod(clazz) == null
+ && ReflectUtils.getAnnotatedConstructor(clazz) == null) {
+ Optional<Field> finalField =
+ types
+ .stream()
+ .map(FieldValueTypeInformation::getField)
+ .filter(f -> Modifier.isFinal(f.getModifiers()))
+ .findAny();
+ if (finalField.isPresent()) {
+ throw new IllegalArgumentException(
+ "Class "
+ + clazz
+ + " has final fields and no "
+ + "registered creator. Cannot use as schema, as we don't know how to create this "
+ + "object automatically");
+ }
+ }
+ return types;
}
}
@Override
public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
- return POJOUtils.schemaFromPojoClass(typeDescriptor.getRawType());
+ return POJOUtils.schemaFromPojoClass(
+ typeDescriptor.getRawType(), JavaFieldTypeSupplier.INSTANCE);
}
@Override
public FieldValueGetterFactory fieldValueGetterFactory() {
return (Class<?> targetClass, Schema schema) ->
- POJOUtils.getGetters(targetClass, schema, new JavaFieldTypeSupplier());
+ POJOUtils.getGetters(targetClass, schema, JavaFieldTypeSupplier.INSTANCE);
}
@Override
public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
return (Class<?> targetClass, Schema schema) ->
- POJOUtils.getFieldTypes(targetClass, schema, new JavaFieldTypeSupplier());
+ POJOUtils.getFieldTypes(targetClass, schema, JavaFieldTypeSupplier.INSTANCE);
}
@Override
UserTypeCreatorFactory schemaTypeCreatorFactory() {
- return (Class<?> targetClass, Schema schema) ->
- POJOUtils.getCreator(targetClass, schema, new JavaFieldTypeSupplier());
+ return (Class<?> targetClass, Schema schema) -> {
+ // If a static method is marked with @SchemaCreate, use that.
+ Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetClass);
+ if (annotated != null) {
+ return POJOUtils.getStaticCreator(
+ targetClass, annotated, schema, JavaFieldTypeSupplier.INSTANCE);
+ }
+
+ // If a Constructor was tagged with @SchemaCreate, invoke that constructor.
+ Constructor<?> constructor = ReflectUtils.getAnnotatedConstructor(targetClass);
+ if (constructor != null) {
+ return POJOUtils.getConstructorCreator(
+ targetClass, constructor, schema, JavaFieldTypeSupplier.INSTANCE);
+ }
+
+ return POJOUtils.getSetFieldCreator(targetClass, schema, JavaFieldTypeSupplier.INSTANCE);
+ };
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java
index a38af7e..a2daaa4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java
@@ -30,6 +30,7 @@ import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/DefaultSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/DefaultSchema.java
similarity index 94%
rename from sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/DefaultSchema.java
rename to sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/DefaultSchema.java
index c5cc20e..a364b8c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/DefaultSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/DefaultSchema.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.schemas;
+package org.apache.beam.sdk.schemas.annotations;
import static com.google.common.base.Preconditions.checkArgument;
@@ -33,6 +33,9 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.SchemaProviderRegistrar;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -97,7 +100,7 @@ public @interface DefaultSchema {
+ " specified as the default SchemaProvider for type "
+ type
+ ". Make "
- + " sure that this class has a default constructor.",
+ + " sure that this class has a public default constructor.",
e);
}
});
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaCreate.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaCreate.java
new file mode 100644
index 0000000..a3c51b2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaCreate.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Can be put on a constructor or a static method, in which case that constructor or method will be
+ * used to created instance of the class by Beam's schema code.
+ *
+ * <p>For example, the following Java POJO.
+ *
+ * <pre><code>
+ * {@literal @}DefaultSchema(JavaBeanSchema.class)
+ * class MyClass {
+ * public final String user;
+ * public final int age;
+ *
+ * {@literal @}SchemaCreate
+ * public MyClass(String user, int age) {
+ * this.user = user;
+ * this.age = age;
+ * }
+ * }
+ * </code></pre>
+ *
+ * <p>This tells Beam that this constructor can be used to construct instances. Beam will match up
+ * the names of the constructor arguments to schema fields in order to decide how to create the
+ * class from a Row.
+ *
+ * <p>This can also be used to annotate a static factory method on the class. For example:
+ *
+ * <pre><code>
+ * {@literal @}DefaultSchema(JavaBeanSchema.class)
+ * class MyClass {
+ * public final String user;
+ * public final int age;
+ *
+ * private MyClass(String user, int age) { this.user = user; this.age = age; }
+ *
+ * {@literal @}SchemaCreate
+ * public static MyClass create(String user, int age) {
+ * return new MyClass(user, age);
+ * }
+ * }
+ * </code></pre>
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.CONSTRUCTOR, ElementType.METHOD})
+@SuppressWarnings("rawtypes")
+@Experimental(Kind.SCHEMAS)
+public @interface SchemaCreate {}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldName.java
new file mode 100644
index 0000000..88c1de8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaFieldName.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * When used on a POJO field or a JavaBean getter, the specified name is used for the generated
+ * schema field.
+ *
+ * <p>For example, a Java POJO with a field that we want in our schema but under a different name.
+ *
+ * <pre><code>
+ * {@literal @}DefaultSchema(JavaBeanSchema.class)
+ * class MyClass {
+ * public String user;
+ * {@literal @}SchemaFieldName("age")
+ * public int ageInYears;
+ * }
+ * </code></pre>
+ *
+ * <p>The resulting schema will have fields named "user" and "age."
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD})
+@SuppressWarnings("rawtypes")
+@Experimental(Kind.SCHEMAS)
+public @interface SchemaFieldName {
+ @Nonnull
+ String value();
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaIgnore.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaIgnore.java
new file mode 100644
index 0000000..b7419d5
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/SchemaIgnore.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * When used on a POJO field or a JavaBean getter, that field or getter is ignored from the inferred
+ * schema.
+ *
+ * <p>For example, a Java POJO with a field that we don't want included in the schema.
+ *
+ * <pre><code>
+ * {@literal @}DefaultSchema(JavaBeanSchema.class)
+ * class MyClass {
+ * public String user;
+ * public int age;
+ * {@literal @}SchemaIgnore public String pleaseDontAddToSchema;
+ * }
+ * </code></pre>
+ *
+ * <p>In this case, the pleaseDontAddToSchema will be excluded from the schema, and implicitly
+ * dropped from calculations.
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD})
+@SuppressWarnings("rawtypes")
+@Experimental(Kind.SCHEMAS)
+public @interface SchemaIgnore {}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/package-info.java
similarity index 57%
copy from sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
copy to sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/package-info.java
index 12330ec..99a5a9d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/package-info.java
@@ -15,21 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.schemas.utils;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
-import org.apache.beam.sdk.schemas.Schema;
-
/**
- * A naming policy for schema fields. This maps a name from the class (field name or getter name) to
- * the matching field name in the schema.
+ * Defines {@link org.apache.beam.sdk.schemas.Schema} and other classes for representing schema'd
+ * data in a {@link org.apache.beam.sdk.Pipeline}.
+ *
+ * <p>For further details, see the documentation for each class in this package.
*/
-public interface FieldValueTypeSupplier extends Serializable {
- /**
- * Return all the FieldValueTypeInformations. The returned list must be in the same order as
- * fields in the schema.
- */
- List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema);
-}
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.schemas.annotations;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 43e6a5c..177ff64 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -302,26 +302,27 @@ public class AvroUtils {
private static final class AvroSpecificRecordFieldValueTypeSupplier
implements FieldValueTypeSupplier {
@Override
+ public List<FieldValueTypeInformation> get(Class<?> clazz) {
+ throw new RuntimeException("Unexpected call.");
+ }
+
+ @Override
public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
Map<String, String> mapping = getMapping(schema);
- Map<String, FieldValueTypeInformation> types = Maps.newHashMap();
+ List<FieldValueTypeInformation> types = Lists.newArrayList();
for (Method method : ReflectUtils.getMethods(clazz)) {
if (ReflectUtils.isGetter(method)) {
FieldValueTypeInformation fieldValueTypeInformation =
FieldValueTypeInformation.forGetter(method);
String name = mapping.get(fieldValueTypeInformation.getName());
if (name != null) {
- types.put(name, fieldValueTypeInformation.withName(name));
+ types.add(fieldValueTypeInformation.withName(name));
}
}
}
// Return the list ordered by the schema fields.
- return schema
- .getFields()
- .stream()
- .map(f -> types.get(f.getName()))
- .collect(Collectors.toList());
+ return StaticSchemaInference.sortBySchema(types, schema);
}
private Map<String, String> getMapping(Schema schema) {
@@ -340,7 +341,7 @@ public class AvroUtils {
private static final class AvroPojoFieldValueTypeSupplier implements FieldValueTypeSupplier {
@Override
- public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+ public List<FieldValueTypeInformation> get(Class<?> clazz) {
Map<String, FieldValueTypeInformation> types = Maps.newHashMap();
for (java.lang.reflect.Field f : ReflectUtils.getFields(clazz)) {
if (!f.isAnnotationPresent(AvroIgnore.class)) {
@@ -352,12 +353,7 @@ public class AvroUtils {
types.put(typeInformation.getName(), typeInformation);
}
}
- // Return the list ordered by the schema fields.
- return schema
- .getFields()
- .stream()
- .map(f -> types.get(f.getName()))
- .collect(Collectors.toList());
+ return Lists.newArrayList(types.values());
}
}
@@ -386,7 +382,7 @@ public class AvroUtils {
if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) {
return AvroByteBuddyUtils.getCreator((Class<? extends SpecificRecord>) clazz, schema);
} else {
- return POJOUtils.getCreator(clazz, schema, new AvroPojoFieldValueTypeSupplier());
+ return POJOUtils.getSetFieldCreator(clazz, schema, new AvroPojoFieldValueTypeSupplier());
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
index a0d40c8..a062496 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
@@ -17,6 +17,14 @@
*/
package org.apache.beam.sdk.schemas.utils;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Parameter;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -27,9 +35,15 @@ import java.util.Map;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.NamingStrategy;
import net.bytebuddy.NamingStrategy.SuffixingRandom.BaseNameResolver;
+import net.bytebuddy.description.method.MethodDescription.ForLoadedConstructor;
+import net.bytebuddy.description.method.MethodDescription.ForLoadedMethod;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender.Size;
import net.bytebuddy.implementation.bytecode.Duplication;
import net.bytebuddy.implementation.bytecode.StackManipulation;
import net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
@@ -37,13 +51,18 @@ import net.bytebuddy.implementation.bytecode.TypeCreation;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.collection.ArrayAccess;
import net.bytebuddy.implementation.bytecode.collection.ArrayFactory;
+import net.bytebuddy.implementation.bytecode.constant.IntegerConstant;
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
import net.bytebuddy.matcher.ElementMatchers;
import net.bytebuddy.utility.RandomString;
import org.apache.avro.generic.GenericFixed;
import org.apache.beam.sdk.schemas.FieldValueGetter;
import org.apache.beam.sdk.schemas.FieldValueSetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeParameter;
import org.apache.commons.lang3.ArrayUtils;
@@ -595,4 +614,163 @@ class ByteBuddyUtils {
return readValue;
}
}
+
+ /**
+ * Invokes a constructor registered using SchemaCreate. As constructor parameters might not be in
+ * the same order as the schema fields, reorders the parameters as necessary before calling the
+ * constructor.
+ */
+ static class ConstructorCreateInstruction extends InvokeUserCreateInstruction {
+ private final Constructor constructor;
+
+ ConstructorCreateInstruction(
+ List<FieldValueTypeInformation> fields, Class targetClass, Constructor constructor) {
+ super(fields, targetClass, Lists.newArrayList(constructor.getParameters()));
+ this.constructor = constructor;
+ }
+
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ protected StackManipulation beforePushingParameters() {
+ // Create the target class.
+ ForLoadedType loadedType = new ForLoadedType(targetClass);
+ return new StackManipulation.Compound(TypeCreation.of(loadedType), Duplication.SINGLE);
+ }
+
+ @Override
+ protected StackManipulation afterPushingParameters() {
+ return MethodInvocation.invoke(new ForLoadedConstructor(constructor));
+ }
+ }
+
+ /**
+ * Invokes a static factory method registered using SchemaCreate. As the method parameters might
+ * not be in the same order as the schema fields, reorders the parameters as necessary before
+ * calling the constructor.
+ */
+ static class StaticFactoryMethodInstruction extends InvokeUserCreateInstruction {
+ private final Method creator;
+
+ StaticFactoryMethodInstruction(
+ List<FieldValueTypeInformation> fields, Class targetClass, Method creator) {
+ super(fields, targetClass, Lists.newArrayList(creator.getParameters()));
+ if (!Modifier.isStatic(creator.getModifiers())) {
+ throw new IllegalArgumentException("Method " + creator + " is not static");
+ }
+ this.creator = creator;
+ }
+
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ protected StackManipulation afterPushingParameters() {
+ return MethodInvocation.invoke(new ForLoadedMethod(creator));
+ }
+ }
+
+ static class InvokeUserCreateInstruction implements Implementation {
+ protected final List<FieldValueTypeInformation> fields;
+ protected final Class targetClass;
+ protected final List<Parameter> parameters;
+ protected final Map<Integer, Integer> fieldMapping;
+
+ protected InvokeUserCreateInstruction(
+ List<FieldValueTypeInformation> fields, Class targetClass, List<Parameter> parameters) {
+ this.fields = fields;
+ this.targetClass = targetClass;
+ this.parameters = parameters;
+
+ // Method parameters might not be in the same order as the schema fields, and the input
+ // array to SchemaUserTypeCreator.create is in schema order. Examine the parameter names
+ // and compare against field names to calculate the mapping between the two lists.
+ Map<String, Integer> fieldsByLogicalName = Maps.newHashMap();
+ Map<String, Integer> fieldsByJavaClassMember = Maps.newHashMap();
+ for (int i = 0; i < fields.size(); ++i) {
+ // Method parameters are allowed to either correspond to the schema field names or to the
+ // actual Java field or method names.
+ FieldValueTypeInformation fieldValue = checkNotNull(fields.get(i));
+ fieldsByLogicalName.put(fieldValue.getName(), i);
+ if (fieldValue.getField() != null) {
+ fieldsByJavaClassMember.put(fieldValue.getField().getName(), i);
+ } else if (fieldValue.getMethod() != null) {
+ String name = ReflectUtils.stripPrefix(fieldValue.getMethod().getName(), "set");
+ fieldsByJavaClassMember.put(name, i);
+ }
+ }
+
+ fieldMapping = Maps.newHashMap();
+ for (int i = 0; i < parameters.size(); ++i) {
+ Parameter parameter = parameters.get(i);
+ String paramName = parameter.getName();
+ Integer index = fieldsByLogicalName.get(paramName);
+ if (index == null) {
+ index = fieldsByJavaClassMember.get(paramName);
+ }
+ if (index == null) {
+ throw new RuntimeException(
+ "Creator parameter " + paramName + " Doesn't correspond to a schema field");
+ }
+ fieldMapping.put(i, index);
+ }
+ }
+
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return (methodVisitor, implementationContext, instrumentedMethod) -> {
+ // this + method parameters.
+ int numLocals = 1 + instrumentedMethod.getParameters().size();
+
+ StackManipulation stackManipulation = beforePushingParameters();
+
+ // Push all creator parameters on the stack.
+ ConvertType convertType = new ConvertType(true);
+ for (int i = 0; i < parameters.size(); i++) {
+ Parameter parameter = parameters.get(i);
+ ForLoadedType convertedType =
+ new ForLoadedType(
+ (Class) convertType.convert(TypeDescriptor.of(parameter.getType())));
+
+ // The instruction to read the parameter. Use the fieldMapping to reorder parameters as
+ // necessary.
+ StackManipulation readParameter =
+ new StackManipulation.Compound(
+ MethodVariableAccess.REFERENCE.loadFrom(1),
+ IntegerConstant.forValue(fieldMapping.get(i)),
+ ArrayAccess.REFERENCE.load(),
+ TypeCasting.to(convertedType));
+ stackManipulation =
+ new StackManipulation.Compound(
+ stackManipulation,
+ new ConvertValueForSetter(readParameter)
+ .convert(TypeDescriptor.of(parameter.getType())));
+ }
+ stackManipulation =
+ new StackManipulation.Compound(
+ stackManipulation, afterPushingParameters(), MethodReturn.REFERENCE);
+
+ StackManipulation.Size size = stackManipulation.apply(methodVisitor, implementationContext);
+ return new Size(size.getMaximalSize(), numLocals);
+ };
+ }
+
+ protected StackManipulation beforePushingParameters() {
+ return new StackManipulation.Compound();
+ }
+
+ protected StackManipulation afterPushingParameters() {
+ return new StackManipulation.Compound();
+ }
+ }
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
index 12330ec..d93456b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
@@ -27,9 +27,16 @@ import org.apache.beam.sdk.schemas.Schema;
* the matching field name in the schema.
*/
public interface FieldValueTypeSupplier extends Serializable {
+ /** Return all the FieldValueTypeInformations. */
+ List<FieldValueTypeInformation> get(Class<?> clazz);
+
/**
- * Return all the FieldValueTypeInformations. The returned list must be in the same order as
+ * Return all the FieldValueTypeInformations.
+ *
+ * <p>If the schema parameter is not null, then the returned list must be in the same order as
* fields in the schema.
*/
- List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema);
+ default List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+ return StaticSchemaInference.sortBySchema(get(clazz), schema);
+ }
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
index 58cc0ed..bb9a76c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.schemas.utils;
import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
@@ -44,8 +45,11 @@ import org.apache.beam.sdk.schemas.FieldValueGetter;
import org.apache.beam.sdk.schemas.FieldValueSetter;
import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConstructorCreateInstruction;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.StaticFactoryMethodInstruction;
import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
import org.apache.beam.sdk.util.common.ReflectHelpers;
@@ -53,33 +57,21 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
@Experimental(Kind.SCHEMAS)
public class JavaBeanUtils {
/** Create a {@link Schema} for a Java Bean class. */
- public static Schema schemaFromJavaBeanClass(Class<?> clazz) {
- return StaticSchemaInference.schemaFromClass(clazz, JavaBeanUtils::typeInformationFromClass);
+ public static Schema schemaFromJavaBeanClass(
+ Class<?> clazz, FieldValueTypeSupplier fieldValueTypeSupplier) {
+ return StaticSchemaInference.schemaFromClass(clazz, fieldValueTypeSupplier);
}
- private static List<FieldValueTypeInformation> typeInformationFromClass(Class<?> clazz) {
- List<FieldValueTypeInformation> getterTypes =
- ReflectUtils.getMethods(clazz)
- .stream()
- .filter(ReflectUtils::isGetter)
- .map(FieldValueTypeInformation::forGetter)
- .collect(Collectors.toList());
-
- Map<String, FieldValueTypeInformation> setterTypes =
- ReflectUtils.getMethods(clazz)
+ // Make sure that there are matching setters and getters.
+ public static void validateJavaBean(
+ List<FieldValueTypeInformation> getters, List<FieldValueTypeInformation> setters) {
+ Map<String, FieldValueTypeInformation> setterMap =
+ setters
.stream()
- .filter(ReflectUtils::isSetter)
- .map(FieldValueTypeInformation::forSetter)
.collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
- validateJavaBean(getterTypes, setterTypes);
- return getterTypes;
- }
- // Make sure that there are matching setters and getters.
- private static void validateJavaBean(
- List<FieldValueTypeInformation> getters, Map<String, FieldValueTypeInformation> setters) {
for (FieldValueTypeInformation type : getters) {
- FieldValueTypeInformation setterType = setters.get(type.getName());
+ FieldValueTypeInformation setterType = setterMap.get(type.getName());
if (setterType == null) {
throw new RuntimeException(
"JavaBean contained a getter for field "
@@ -218,6 +210,84 @@ public class JavaBeanUtils {
.intercept(new InvokeSetterInstruction(method));
}
+ // The list of constructors for a class is cached, so we only create the classes the first time
+ // getConstructor is called.
+ public static final Map<ClassWithSchema, SchemaUserTypeCreator> CACHED_CREATORS =
+ Maps.newConcurrentMap();
+
+ public static SchemaUserTypeCreator getConstructorCreator(
+ Class clazz,
+ Constructor constructor,
+ Schema schema,
+ FieldValueTypeSupplier fieldValueTypeSupplier) {
+ return CACHED_CREATORS.computeIfAbsent(
+ new ClassWithSchema(clazz, schema),
+ c -> {
+ List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
+ return createConstructorCreator(clazz, constructor, schema, types);
+ });
+ }
+
+ public static <T> SchemaUserTypeCreator createConstructorCreator(
+ Class<T> clazz,
+ Constructor<T> constructor,
+ Schema schema,
+ List<FieldValueTypeInformation> types) {
+ try {
+ DynamicType.Builder<SchemaUserTypeCreator> builder =
+ BYTE_BUDDY
+ .subclass(SchemaUserTypeCreator.class)
+ .method(ElementMatchers.named("create"))
+ .intercept(new ConstructorCreateInstruction(types, clazz, constructor));
+ return builder
+ .make()
+ .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded()
+ .getDeclaredConstructor()
+ .newInstance();
+ } catch (InstantiationException
+ | IllegalAccessException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
+ throw new RuntimeException(
+ "Unable to generate a creator for class " + clazz + " with schema " + schema);
+ }
+ }
+
+ public static SchemaUserTypeCreator getStaticCreator(
+ Class clazz, Method creator, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
+ return CACHED_CREATORS.computeIfAbsent(
+ new ClassWithSchema(clazz, schema),
+ c -> {
+ List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
+ return createStaticCreator(clazz, creator, schema, types);
+ });
+ }
+
+ public static <T> SchemaUserTypeCreator createStaticCreator(
+ Class<T> clazz, Method creator, Schema schema, List<FieldValueTypeInformation> types) {
+ try {
+ DynamicType.Builder<SchemaUserTypeCreator> builder =
+ BYTE_BUDDY
+ .subclass(SchemaUserTypeCreator.class)
+ .method(ElementMatchers.named("create"))
+ .intercept(new StaticFactoryMethodInstruction(types, clazz, creator));
+
+ return builder
+ .make()
+ .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded()
+ .getDeclaredConstructor()
+ .newInstance();
+ } catch (InstantiationException
+ | IllegalAccessException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
+ throw new RuntimeException(
+ "Unable to generate a creator for " + clazz + " with schema " + schema);
+ }
+ }
+
// Implements a method to read a public getter out of an object.
private static class InvokeGetterInstruction implements Implementation {
private final FieldValueTypeInformation typeInformation;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
index 2c22490..07c5915 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
@@ -18,11 +18,12 @@
package org.apache.beam.sdk.schemas.utils;
import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
-import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import net.bytebuddy.ByteBuddy;
@@ -53,9 +54,11 @@ import org.apache.beam.sdk.schemas.FieldValueSetter;
import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConstructorCreateInstruction;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.InjectPackageStrategy;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.StaticFactoryMethodInstruction;
import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -63,14 +66,9 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/** A set of utilities yo generate getter and setter classes for POJOs. */
@Experimental(Kind.SCHEMAS)
public class POJOUtils {
- public static Schema schemaFromPojoClass(Class<?> clazz) {
- Function<Class, List<FieldValueTypeInformation>> getTypesForClass =
- c ->
- ReflectUtils.getFields(c)
- .stream()
- .map(FieldValueTypeInformation::forField)
- .collect(Collectors.toList());
- return StaticSchemaInference.schemaFromClass(clazz, getTypesForClass);
+ public static Schema schemaFromPojoClass(
+ Class<?> clazz, FieldValueTypeSupplier fieldValueTypeSupplier) {
+ return StaticSchemaInference.schemaFromClass(clazz, fieldValueTypeSupplier);
}
// Static ByteBuddy instance used by all helpers.
@@ -112,17 +110,17 @@ public class POJOUtils {
public static final Map<ClassWithSchema, SchemaUserTypeCreator> CACHED_CREATORS =
Maps.newConcurrentMap();
- public static <T> SchemaUserTypeCreator getCreator(
+ public static <T> SchemaUserTypeCreator getSetFieldCreator(
Class<T> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
return CACHED_CREATORS.computeIfAbsent(
new ClassWithSchema(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
- return createCreator(clazz, schema, types);
+ return createSetFieldCreator(clazz, schema, types);
});
}
- private static <T> SchemaUserTypeCreator createCreator(
+ private static <T> SchemaUserTypeCreator createSetFieldCreator(
Class<T> clazz, Schema schema, List<FieldValueTypeInformation> types) {
// Get the list of class fields ordered by schema.
List<Field> fields =
@@ -133,7 +131,81 @@ public class POJOUtils {
.with(new InjectPackageStrategy(clazz))
.subclass(SchemaUserTypeCreator.class)
.method(ElementMatchers.named("create"))
- .intercept(new CreateInstruction(fields, clazz));
+ .intercept(new SetFieldCreateInstruction(fields, clazz));
+
+ return builder
+ .make()
+ .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded()
+ .getDeclaredConstructor()
+ .newInstance();
+ } catch (InstantiationException
+ | IllegalAccessException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
+ throw new RuntimeException(
+ "Unable to generate a creator for " + clazz + " with schema " + schema);
+ }
+ }
+
+ public static SchemaUserTypeCreator getConstructorCreator(
+ Class clazz,
+ Constructor constructor,
+ Schema schema,
+ FieldValueTypeSupplier fieldValueTypeSupplier) {
+ return CACHED_CREATORS.computeIfAbsent(
+ new ClassWithSchema(clazz, schema),
+ c -> {
+ List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
+ return createConstructorCreator(clazz, constructor, schema, types);
+ });
+ }
+
+ public static <T> SchemaUserTypeCreator createConstructorCreator(
+ Class<T> clazz,
+ Constructor<T> constructor,
+ Schema schema,
+ List<FieldValueTypeInformation> types) {
+ try {
+ DynamicType.Builder<SchemaUserTypeCreator> builder =
+ BYTE_BUDDY
+ .subclass(SchemaUserTypeCreator.class)
+ .method(ElementMatchers.named("create"))
+ .intercept(new ConstructorCreateInstruction(types, clazz, constructor));
+
+ return builder
+ .make()
+ .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded()
+ .getDeclaredConstructor()
+ .newInstance();
+ } catch (InstantiationException
+ | IllegalAccessException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
+ throw new RuntimeException(
+ "Unable to generate a creator for " + clazz + " with schema " + schema);
+ }
+ }
+
+ public static SchemaUserTypeCreator getStaticCreator(
+ Class clazz, Method creator, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
+ return CACHED_CREATORS.computeIfAbsent(
+ new ClassWithSchema(clazz, schema),
+ c -> {
+ List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
+ return createStaticCreator(clazz, creator, schema, types);
+ });
+ }
+
+ public static <T> SchemaUserTypeCreator createStaticCreator(
+ Class<T> clazz, Method creator, Schema schema, List<FieldValueTypeInformation> types) {
+ try {
+ DynamicType.Builder<SchemaUserTypeCreator> builder =
+ BYTE_BUDDY
+ .subclass(SchemaUserTypeCreator.class)
+ .method(ElementMatchers.named("create"))
+ .intercept(new StaticFactoryMethodInstruction(types, clazz, creator));
return builder
.make()
@@ -345,11 +417,11 @@ public class POJOUtils {
}
// Implements a method to construct an object.
- static class CreateInstruction implements Implementation {
+ static class SetFieldCreateInstruction implements Implementation {
private final List<Field> fields;
private final Class pojoClass;
- CreateInstruction(List<Field> fields, Class pojoClass) {
+ SetFieldCreateInstruction(List<Field> fields, Class pojoClass) {
this.fields = fields;
this.pojoClass = pojoClass;
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
index 26b3d70..300b8f11 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
@@ -21,16 +21,20 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.security.InvalidParameterException;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
/** A set of reflection helper methods. */
public class ReflectUtils {
@@ -62,6 +66,7 @@ public class ReflectUtils {
}
private static final Map<Class, List<Method>> DECLARED_METHODS = Maps.newHashMap();
+ private static final Map<Class, Method> ANNOTATED_CONSTRUCTORS = Maps.newHashMap();
private static final Map<Class, List<Field>> DECLARED_FIELDS = Maps.newHashMap();
/** Returns the list of public, non-static methods in the class, caching the results. */
@@ -77,6 +82,40 @@ public class ReflectUtils {
});
}
+ @Nullable
+ public static Constructor getAnnotatedConstructor(Class clazz) {
+ return Arrays.stream(clazz.getDeclaredConstructors())
+ .filter(m -> !Modifier.isPrivate(m.getModifiers()))
+ .filter(m -> !Modifier.isProtected(m.getModifiers()))
+ .filter(m -> m.getAnnotation(SchemaCreate.class) != null)
+ .findFirst()
+ .orElse(null);
+ }
+
+ @Nullable
+ public static Method getAnnotatedCreateMethod(Class clazz) {
+ return ANNOTATED_CONSTRUCTORS.computeIfAbsent(
+ clazz,
+ c -> {
+ Method method =
+ Arrays.stream(clazz.getDeclaredMethods())
+ .filter(m -> !Modifier.isPrivate(m.getModifiers()))
+ .filter(m -> !Modifier.isProtected(m.getModifiers()))
+ .filter(m -> Modifier.isStatic(m.getModifiers()))
+ .filter(m -> m.getAnnotation(SchemaCreate.class) != null)
+ .findFirst()
+ .orElse(null);
+ if (method != null && !clazz.isAssignableFrom(method.getReturnType())) {
+ throw new InvalidParameterException(
+ "A method marked with SchemaCreate in class "
+ + clazz
+ + " does not return a type assignable to "
+ + clazz);
+ }
+ return method;
+ });
+ }
+
// Get all public, non-static, non-transient fields.
public static List<Field> getFields(Class<?> clazz) {
return DECLARED_FIELDS.computeIfAbsent(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
index d3f81e8..799b7d6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -35,6 +36,19 @@ import org.joda.time.ReadableInstant;
/** A set of utilities for inferring a Beam {@link Schema} from static Java types. */
public class StaticSchemaInference {
+ public static List<FieldValueTypeInformation> sortBySchema(
+ List<FieldValueTypeInformation> types, Schema schema) {
+ Map<String, FieldValueTypeInformation> typeMap =
+ types
+ .stream()
+ .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
+ return schema
+ .getFields()
+ .stream()
+ .map(f -> typeMap.get(f.getName()))
+ .collect(Collectors.toList());
+ }
+
enum MethodType {
GETTER,
SETTER
@@ -67,10 +81,10 @@ public class StaticSchemaInference {
* public getter methods, or special annotations on the class.
*/
public static Schema schemaFromClass(
- Class<?> clazz, Function<Class, List<FieldValueTypeInformation>> getTypesForClass) {
+ Class<?> clazz, FieldValueTypeSupplier fieldValueTypeSupplier) {
Schema.Builder builder = Schema.builder();
- for (FieldValueTypeInformation type : getTypesForClass.apply(clazz)) {
- Schema.FieldType fieldType = fieldFromType(type.getType(), getTypesForClass);
+ for (FieldValueTypeInformation type : fieldValueTypeSupplier.get(clazz)) {
+ Schema.FieldType fieldType = fieldFromType(type.getType(), fieldValueTypeSupplier);
if (type.isNullable()) {
builder.addNullableField(type.getName(), fieldType);
} else {
@@ -82,7 +96,7 @@ public class StaticSchemaInference {
// Map a Java field type to a Beam Schema FieldType.
private static Schema.FieldType fieldFromType(
- TypeDescriptor type, Function<Class, List<FieldValueTypeInformation>> getTypesForClass) {
+ TypeDescriptor type, FieldValueTypeSupplier fieldValueTypeSupplier) {
FieldType primitiveType = PRIMITIVE_TYPES.get(type.getRawType());
if (primitiveType != null) {
return primitiveType;
@@ -95,7 +109,7 @@ public class StaticSchemaInference {
return FieldType.BYTES;
} else {
// Otherwise this is an array type.
- return FieldType.array(fieldFromType(component, getTypesForClass));
+ return FieldType.array(fieldFromType(component, fieldValueTypeSupplier));
}
} else if (type.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
TypeDescriptor<Collection<?>> collection = type.getSupertype(Collection.class);
@@ -103,7 +117,7 @@ public class StaticSchemaInference {
ParameterizedType ptype = (ParameterizedType) collection.getType();
java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
checkArgument(params.length == 1);
- return FieldType.array(fieldFromType(TypeDescriptor.of(params[0]), getTypesForClass));
+ return FieldType.array(fieldFromType(TypeDescriptor.of(params[0]), fieldValueTypeSupplier));
} else {
throw new RuntimeException("Cannot infer schema from unparameterized collection.");
}
@@ -113,8 +127,8 @@ public class StaticSchemaInference {
ParameterizedType ptype = (ParameterizedType) map.getType();
java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
checkArgument(params.length == 2);
- FieldType keyType = fieldFromType(TypeDescriptor.of(params[0]), getTypesForClass);
- FieldType valueType = fieldFromType(TypeDescriptor.of(params[1]), getTypesForClass);
+ FieldType keyType = fieldFromType(TypeDescriptor.of(params[0]), fieldValueTypeSupplier);
+ FieldType valueType = fieldFromType(TypeDescriptor.of(params[1]), fieldValueTypeSupplier);
checkArgument(
keyType.getTypeName().isPrimitiveType(),
"Only primitive types can be map keys. type: " + keyType.getTypeName());
@@ -129,7 +143,7 @@ public class StaticSchemaInference {
} else if (type.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) {
return FieldType.BYTES;
} else {
- return FieldType.row(schemaFromClass(type.getRawType(), getTypesForClass));
+ return FieldType.row(schemaFromClass(type.getRawType(), fieldValueTypeSupplier));
}
}
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
index 16b5a5f..9e8cd64 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
@@ -37,15 +37,19 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.MismatchingNullableBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArraysBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedMapBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBeanWithAnnotations;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
/** Tests for the {@link JavaBeanSchema} schema provider. */
public class JavaBeanSchemaTest {
@@ -67,6 +71,21 @@ public class JavaBeanSchemaTest {
new StringBuilder(name).append("builder"));
}
+ private SimpleBeanWithAnnotations createAnnotated(String name) {
+ return new SimpleBeanWithAnnotations(
+ name,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ true,
+ DATE,
+ DATE.toInstant(),
+ BYTE_ARRAY,
+ BigDecimal.ONE,
+ new StringBuilder(name).append("builder"));
+ }
+
private Row createSimpleRow(String name) {
return Row.withSchema(SIMPLE_BEAN_SCHEMA)
.addValues(
@@ -238,9 +257,9 @@ public class JavaBeanSchemaTest {
NestedArrayBean bean = new NestedArrayBean(simple1, simple2, simple3);
Row row = registry.getToRowFunction(NestedArrayBean.class).apply(bean);
List<Row> rows = row.getArray("beans");
- assertSame(simple1, registry.getFromRowFunction(NestedArrayBean.class).apply(rows.get(0)));
- assertSame(simple2, registry.getFromRowFunction(NestedArrayBean.class).apply(rows.get(1)));
- assertSame(simple3, registry.getFromRowFunction(NestedArrayBean.class).apply(rows.get(2)));
+ assertSame(simple1, registry.getFromRowFunction(SimpleBean.class).apply(rows.get(0)));
+ assertSame(simple2, registry.getFromRowFunction(SimpleBean.class).apply(rows.get(1)));
+ assertSame(simple3, registry.getFromRowFunction(SimpleBean.class).apply(rows.get(2)));
}
@Test
@@ -250,7 +269,6 @@ public class JavaBeanSchemaTest {
Row row1 = createSimpleRow("string1");
Row row2 = createSimpleRow("string2");
Row row3 = createSimpleRow("string3");
- ;
Row row = Row.withSchema(NESTED_ARRAY_BEAN_SCHEMA).addArray(row1, row2, row3).build();
NestedArrayBean bean = registry.getFromRowFunction(NestedArrayBean.class).apply(row);
@@ -334,4 +352,42 @@ public class JavaBeanSchemaTest {
assertEquals("string2", bean.getMap().get("simple2").getStr());
assertEquals("string3", bean.getMap().get("simple3").getStr());
}
+
+ @Test
+ public void testAnnotations() throws NoSuchSchemaException {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema schema = registry.getSchema(SimpleBeanWithAnnotations.class);
+ SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
+
+ SimpleBeanWithAnnotations pojo = createAnnotated("string");
+ Row row = registry.getToRowFunction(SimpleBeanWithAnnotations.class).apply(pojo);
+ assertEquals(12, row.getFieldCount());
+ assertEquals("string", row.getString("str"));
+ assertEquals((byte) 1, (Object) row.getByte("aByte"));
+ assertEquals((short) 2, (Object) row.getInt16("aShort"));
+ assertEquals((int) 3, (Object) row.getInt32("anInt"));
+ assertEquals((long) 4, (Object) row.getInt64("aLong"));
+ assertEquals(true, (Object) row.getBoolean("aBoolean"));
+ assertEquals(DATE.toInstant(), row.getDateTime("dateTime"));
+ assertEquals(DATE.toInstant(), row.getDateTime("instant"));
+ assertArrayEquals(BYTE_ARRAY, row.getBytes("bytes"));
+ assertArrayEquals(BYTE_ARRAY, row.getBytes("byteBuffer"));
+ assertEquals(BigDecimal.ONE, row.getDecimal("bigDecimal"));
+ assertEquals("stringbuilder", row.getString("stringBuilder"));
+
+ SimpleBeanWithAnnotations pojo2 =
+ registry
+ .getFromRowFunction(SimpleBeanWithAnnotations.class)
+ .apply(createSimpleRow("string"));
+ assertEquals(pojo, pojo2);
+ }
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testMismatchingNullable() throws NoSuchSchemaException {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ thrown.expect(RuntimeException.class);
+ Schema schema = registry.getSchema(MismatchingNullableBean.class);
+ }
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
index 87c42f2..31158bb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
@@ -41,6 +41,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.AnnotatedSimplePojo;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArraysPOJO;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedMapPOJO;
@@ -49,6 +50,7 @@ import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithNestedNullable;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithNullables;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.PrimitiveArrayPOJO;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.StaticCreationSimplePojo;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.Instant;
@@ -78,6 +80,38 @@ public class JavaFieldSchemaTest {
new StringBuilder(name).append("builder"));
}
+ private AnnotatedSimplePojo createAnnotated(String name) {
+ return new AnnotatedSimplePojo(
+ name,
+ (byte) 1,
+ 4L,
+ (short) 2,
+ 3,
+ true,
+ DATE,
+ BigDecimal.ONE,
+ INSTANT,
+ BYTE_ARRAY,
+ BYTE_BUFFER,
+ new StringBuilder(name).append("builder"));
+ }
+
+ private StaticCreationSimplePojo createStaticCreation(String name) {
+ return StaticCreationSimplePojo.of(
+ name,
+ 4L,
+ (byte) 1,
+ (short) 2,
+ 3,
+ true,
+ DATE,
+ BYTE_BUFFER,
+ INSTANT,
+ BYTE_ARRAY,
+ BigDecimal.ONE,
+ new StringBuilder(name).append("builder"));
+ }
+
private Row createSimpleRow(String name) {
return Row.withSchema(SIMPLE_POJO_SCHEMA)
.addValues(
@@ -249,9 +283,9 @@ public class JavaFieldSchemaTest {
NestedArrayPOJO pojo = new NestedArrayPOJO(simple1, simple2, simple3);
Row row = registry.getToRowFunction(NestedArrayPOJO.class).apply(pojo);
List<Row> rows = row.getArray("pojos");
- assertSame(simple1, registry.getFromRowFunction(NestedArrayPOJO.class).apply(rows.get(0)));
- assertSame(simple2, registry.getFromRowFunction(NestedArrayPOJO.class).apply(rows.get(1)));
- assertSame(simple3, registry.getFromRowFunction(NestedArrayPOJO.class).apply(rows.get(2)));
+ assertSame(simple1, registry.getFromRowFunction(SimplePOJO.class).apply(rows.get(0)));
+ assertSame(simple2, registry.getFromRowFunction(SimplePOJO.class).apply(rows.get(1)));
+ assertSame(simple3, registry.getFromRowFunction(SimplePOJO.class).apply(rows.get(2)));
}
@Test
@@ -386,4 +420,34 @@ public class JavaFieldSchemaTest {
registry.getFromRowFunction(POJOWithNestedNullable.class).apply(row);
assertNull(pojo.nested);
}
+
+ @Test
+ public void testAnnotations() throws NoSuchSchemaException {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema schema = registry.getSchema(AnnotatedSimplePojo.class);
+ SchemaTestUtils.assertSchemaEquivalent(SIMPLE_POJO_SCHEMA, schema);
+
+ Row simpleRow = createSimpleRow("string");
+ AnnotatedSimplePojo pojo = createAnnotated("string");
+ assertEquals(simpleRow, registry.getToRowFunction(AnnotatedSimplePojo.class).apply(pojo));
+
+ AnnotatedSimplePojo pojo2 =
+ registry.getFromRowFunction(AnnotatedSimplePojo.class).apply(simpleRow);
+ assertEquals(pojo, pojo2);
+ }
+
+ @Test
+ public void testStaticCreator() throws NoSuchSchemaException {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema schema = registry.getSchema(StaticCreationSimplePojo.class);
+ SchemaTestUtils.assertSchemaEquivalent(SIMPLE_POJO_SCHEMA, schema);
+
+ Row simpleRow = createSimpleRow("string");
+ StaticCreationSimplePojo pojo = createStaticCreation("string");
+ assertEquals(simpleRow, registry.getToRowFunction(StaticCreationSimplePojo.class).apply(pojo));
+
+ StaticCreationSimplePojo pojo2 =
+ registry.getFromRowFunction(StaticCreationSimplePojo.class).apply(simpleRow);
+ assertEquals(pojo, pojo2);
+ }
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java
index 2610873..195e66a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import java.util.List;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -97,7 +98,8 @@ public class SchemaRegistryTest {
tryGetters(registry);
}
- static final class Provider implements SchemaProvider {
+ /** A test SchemaProvider. */
+ public static final class Provider implements SchemaProvider {
@Override
public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
if (typeDescriptor.equals(TypeDescriptors.strings())) {
@@ -174,7 +176,8 @@ public class SchemaRegistryTest {
@DefaultSchema(TestDefaultSchemaProvider.class)
static class TestDefaultSchemaClass {}
- static final class TestDefaultSchemaProvider implements SchemaProvider {
+ /** A test schema provider. */
+ public static final class TestDefaultSchemaProvider implements SchemaProvider {
@Override
public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
if (typeDescriptor.equals(TypeDescriptor.of(TestDefaultSchemaClass.class))) {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CastTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CastTest.java
index aad6f83..3dab794 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CastTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CastTest.java
@@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Objects;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
index 4875850..762152f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/ConvertTest.java
@@ -22,10 +22,10 @@ import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/FilterTest.java
index c1f7993..0a0215c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/FilterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/FilterTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.schemas.transforms;
import com.google.common.collect.Lists;
import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
index 4b2e323..d15e29c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/GroupTest.java
@@ -31,11 +31,11 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
index 718285a..d24ecfd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/SelectTest.java
@@ -18,9 +18,9 @@
package org.apache.beam.sdk.schemas.transforms;
import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
index c835ea6..5467483 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
@@ -38,11 +38,11 @@ import java.util.List;
import org.apache.beam.sdk.schemas.FieldValueGetter;
import org.apache.beam.sdk.schemas.FieldValueSetter;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
+import org.apache.beam.sdk.schemas.JavaBeanSchema.GetterTypeSupplier;
import org.apache.beam.sdk.schemas.JavaBeanSchema.SetterTypeSupplier;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithBoxedFields;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithByteArray;
-import org.apache.beam.sdk.schemas.utils.TestJavaBeans.MismatchingNullableBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedCollectionBean;
@@ -52,66 +52,66 @@ import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveMapBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
import org.joda.time.DateTime;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
/** Tests for the {@link JavaBeanUtils} class. */
public class JavaBeanUtilsTest {
- @Rule public ExpectedException thrown = ExpectedException.none();
-
@Test
public void testNullable() {
- Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class);
+ Schema schema =
+ JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class, GetterTypeSupplier.INSTANCE);
assertTrue(schema.getField("str").getType().getNullable());
assertFalse(schema.getField("anInt").getType().getNullable());
}
@Test
- public void testMismatchingNullable() {
- thrown.expect(RuntimeException.class);
- Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(MismatchingNullableBean.class);
- }
-
- @Test
public void testSimpleBean() {
- Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class);
+ Schema schema =
+ JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class, GetterTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
}
@Test
public void testNestedBean() {
- Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class);
+ Schema schema =
+ JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class, GetterTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(NESTED_BEAN_SCHEMA, schema);
}
@Test
public void testPrimitiveArray() {
- Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveArrayBean.class);
+ Schema schema =
+ JavaBeanUtils.schemaFromJavaBeanClass(
+ PrimitiveArrayBean.class, GetterTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_BEAN_SCHEMA, schema);
}
@Test
public void testNestedArray() {
- Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class);
+ Schema schema =
+ JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class, GetterTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_BEAN_SCHEMA, schema);
}
@Test
public void testNestedCollection() {
- Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedCollectionBean.class);
+ Schema schema =
+ JavaBeanUtils.schemaFromJavaBeanClass(
+ NestedCollectionBean.class, GetterTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_BEAN_SCHEMA, schema);
}
@Test
public void testPrimitiveMap() {
- Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class);
+ Schema schema =
+ JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class, GetterTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_BEAN_SCHEMA, schema);
}
@Test
public void testNestedMap() {
- Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class);
+ Schema schema =
+ JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class, GetterTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_BEAN_SCHEMA, schema);
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
index e140a8b..697000c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
@@ -63,50 +63,56 @@ public class POJOUtilsTest {
@Test
public void testNullables() {
- Schema schema = POJOUtils.schemaFromPojoClass(POJOWithNullables.class);
+ Schema schema =
+ POJOUtils.schemaFromPojoClass(POJOWithNullables.class, JavaFieldTypeSupplier.INSTANCE);
assertTrue(schema.getField("str").getType().getNullable());
assertFalse(schema.getField("anInt").getType().getNullable());
}
@Test
public void testSimplePOJO() {
- Schema schema = POJOUtils.schemaFromPojoClass(SimplePOJO.class);
+ Schema schema = POJOUtils.schemaFromPojoClass(SimplePOJO.class, JavaFieldTypeSupplier.INSTANCE);
assertEquals(SIMPLE_POJO_SCHEMA, schema);
}
@Test
public void testNestedPOJO() {
- Schema schema = POJOUtils.schemaFromPojoClass(NestedPOJO.class);
+ Schema schema = POJOUtils.schemaFromPojoClass(NestedPOJO.class, JavaFieldTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(NESTED_POJO_SCHEMA, schema);
}
@Test
public void testPrimitiveArray() {
- Schema schema = POJOUtils.schemaFromPojoClass(PrimitiveArrayPOJO.class);
+ Schema schema =
+ POJOUtils.schemaFromPojoClass(PrimitiveArrayPOJO.class, JavaFieldTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_POJO_SCHEMA, schema);
}
@Test
public void testNestedArray() {
- Schema schema = POJOUtils.schemaFromPojoClass(NestedArrayPOJO.class);
+ Schema schema =
+ POJOUtils.schemaFromPojoClass(NestedArrayPOJO.class, JavaFieldTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_POJO_SCHEMA, schema);
}
@Test
public void testNestedCollection() {
- Schema schema = POJOUtils.schemaFromPojoClass(NestedCollectionPOJO.class);
+ Schema schema =
+ POJOUtils.schemaFromPojoClass(NestedCollectionPOJO.class, JavaFieldTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_POJO_SCHEMA, schema);
}
@Test
public void testPrimitiveMap() {
- Schema schema = POJOUtils.schemaFromPojoClass(PrimitiveMapPOJO.class);
+ Schema schema =
+ POJOUtils.schemaFromPojoClass(PrimitiveMapPOJO.class, JavaFieldTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_POJO_SCHEMA, schema);
}
@Test
public void testNestedMap() {
- Schema schema = POJOUtils.schemaFromPojoClass(NestedMapPOJO.class);
+ Schema schema =
+ POJOUtils.schemaFromPojoClass(NestedMapPOJO.class, JavaFieldTypeSupplier.INSTANCE);
SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_POJO_SCHEMA, schema);
}
@@ -128,7 +134,7 @@ public class POJOUtilsTest {
new StringBuilder("stringBuilder"));
List<FieldValueGetter> getters =
- POJOUtils.getGetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new JavaFieldTypeSupplier());
+ POJOUtils.getGetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, JavaFieldTypeSupplier.INSTANCE);
assertEquals(12, getters.size());
assertEquals("str", getters.get(0).name());
assertEquals("field1", getters.get(0).get(simplePojo));
@@ -150,7 +156,7 @@ public class POJOUtilsTest {
public void testGeneratedSimpleSetters() {
SimplePOJO simplePojo = new SimplePOJO();
List<FieldValueSetter> setters =
- POJOUtils.getSetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new JavaFieldTypeSupplier());
+ POJOUtils.getSetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, JavaFieldTypeSupplier.INSTANCE);
assertEquals(12, setters.size());
setters.get(0).set(simplePojo, "field1");
@@ -186,7 +192,9 @@ public class POJOUtilsTest {
List<FieldValueGetter> getters =
POJOUtils.getGetters(
- POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new JavaFieldTypeSupplier());
+ POJOWithBoxedFields.class,
+ POJO_WITH_BOXED_FIELDS_SCHEMA,
+ JavaFieldTypeSupplier.INSTANCE);
assertEquals((byte) 41, getters.get(0).get(pojo));
assertEquals((short) 42, getters.get(1).get(pojo));
assertEquals((int) 43, getters.get(2).get(pojo));
@@ -199,7 +207,9 @@ public class POJOUtilsTest {
POJOWithBoxedFields pojo = new POJOWithBoxedFields();
List<FieldValueSetter> setters =
POJOUtils.getSetters(
- POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new JavaFieldTypeSupplier());
+ POJOWithBoxedFields.class,
+ POJO_WITH_BOXED_FIELDS_SCHEMA,
+ JavaFieldTypeSupplier.INSTANCE);
setters.get(0).set(pojo, (byte) 41);
setters.get(1).set(pojo, (short) 42);
@@ -219,7 +229,7 @@ public class POJOUtilsTest {
POJOWithByteArray pojo = new POJOWithByteArray();
List<FieldValueSetter> setters =
POJOUtils.getSetters(
- POJOWithByteArray.class, POJO_WITH_BYTE_ARRAY_SCHEMA, new JavaFieldTypeSupplier());
+ POJOWithByteArray.class, POJO_WITH_BYTE_ARRAY_SCHEMA, JavaFieldTypeSupplier.INSTANCE);
setters.get(0).set(pojo, BYTE_ARRAY);
setters.get(1).set(pojo, BYTE_BUFFER.array());
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
index e77adf7..e73d565 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
@@ -24,10 +24,13 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.joda.time.DateTime;
import org.joda.time.Instant;
@@ -310,6 +313,147 @@ public class TestJavaBeans {
.addStringField("stringBuilder")
.build();
+ /** A simple Bean containing basic types. * */
+ @DefaultSchema(JavaBeanSchema.class)
+ public static class SimpleBeanWithAnnotations {
+ private final String str;
+ private final byte aByte;
+ private final short aShort;
+ private final int anInt;
+ private final long aLong;
+ private final boolean aBoolean;
+ private final DateTime dateTime;
+ private final Instant instant;
+ private final byte[] bytes;
+ private final ByteBuffer byteBuffer;
+ private final BigDecimal bigDecimal;
+ private final StringBuilder stringBuilder;
+
+ @SchemaCreate
+ public SimpleBeanWithAnnotations(
+ String str,
+ byte aByte,
+ short aShort,
+ int anInt,
+ long aLong,
+ boolean aBoolean,
+ DateTime dateTime,
+ Instant instant,
+ byte[] bytes,
+ BigDecimal bigDecimal,
+ StringBuilder stringBuilder) {
+ this.str = str;
+ this.aByte = aByte;
+ this.aShort = aShort;
+ this.anInt = anInt;
+ this.aLong = aLong;
+ this.aBoolean = aBoolean;
+ this.dateTime = dateTime;
+ this.instant = instant;
+ this.bytes = bytes;
+ this.byteBuffer = ByteBuffer.wrap(bytes);
+ this.bigDecimal = bigDecimal;
+ this.stringBuilder = stringBuilder;
+ }
+
+ @SchemaIgnore
+ public String getUnknown() {
+ return "";
+ }
+
+ public String getStr() {
+ return str;
+ }
+
+ @SchemaFieldName("aByte")
+ public byte getTheByteByte() {
+ return aByte;
+ }
+
+ @SchemaFieldName("aShort")
+ public short getNotAShort() {
+ return aShort;
+ }
+
+ public int getAnInt() {
+ return anInt;
+ }
+
+ public long getaLong() {
+ return aLong;
+ }
+
+ public boolean isaBoolean() {
+ return aBoolean;
+ }
+
+ public DateTime getDateTime() {
+ return dateTime;
+ }
+
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return byteBuffer;
+ }
+
+ public Instant getInstant() {
+ return instant;
+ }
+
+ public BigDecimal getBigDecimal() {
+ return bigDecimal;
+ }
+
+ public StringBuilder getStringBuilder() {
+ return stringBuilder;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SimpleBeanWithAnnotations that = (SimpleBeanWithAnnotations) o;
+ return aByte == that.aByte
+ && aShort == that.aShort
+ && anInt == that.anInt
+ && aLong == that.aLong
+ && aBoolean == that.aBoolean
+ && Objects.equals(str, that.str)
+ && Objects.equals(dateTime, that.dateTime)
+ && Objects.equals(instant, that.instant)
+ && Arrays.equals(bytes, that.bytes)
+ && Objects.equals(byteBuffer, that.byteBuffer)
+ && Objects.equals(bigDecimal, that.bigDecimal)
+ && Objects.equals(stringBuilder.toString(), that.stringBuilder.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ int result =
+ Objects.hash(
+ str,
+ aByte,
+ aShort,
+ anInt,
+ aLong,
+ aBoolean,
+ dateTime,
+ instant,
+ byteBuffer,
+ bigDecimal,
+ stringBuilder.toString());
+ result = 31 * result + Arrays.hashCode(bytes);
+ return result;
+ }
+ }
+
/** A Bean containing a nested class. * */
@DefaultSchema(JavaBeanSchema.class)
public static class NestedBean {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
index f985980..0e2a2f4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
@@ -24,10 +24,13 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.joda.time.DateTime;
import org.joda.time.Instant;
@@ -101,6 +104,249 @@ public class TestPOJOs {
public static final Schema NESTED_NULLABLE_SCHEMA =
Schema.builder().addNullableField("nested", FieldType.row(NULLABLES_SCHEMA)).build();
+ /** A POJO for testing static factory methods. */
+ @DefaultSchema(JavaFieldSchema.class)
+ public static class StaticCreationSimplePojo {
+ public final String str;
+ public final byte aByte;
+ public final short aShort;
+ public final int anInt;
+ public final long aLong;
+ public final boolean aBoolean;
+ public final DateTime dateTime;
+ public final Instant instant;
+ public final byte[] bytes;
+ public final ByteBuffer byteBuffer;
+ public final BigDecimal bigDecimal;
+ public final StringBuilder stringBuilder;
+
+ private StaticCreationSimplePojo(
+ String str,
+ byte aByte,
+ short aShort,
+ int anInt,
+ long aLong,
+ boolean aBoolean,
+ DateTime dateTime,
+ Instant instant,
+ byte[] bytes,
+ ByteBuffer byteBuffer,
+ BigDecimal bigDecimal,
+ StringBuilder stringBuilder) {
+ this.str = str;
+ this.aByte = aByte;
+ this.aShort = aShort;
+ this.anInt = anInt;
+ this.aLong = aLong;
+ this.aBoolean = aBoolean;
+ this.dateTime = dateTime;
+ this.instant = instant;
+ this.bytes = bytes;
+ this.byteBuffer = byteBuffer;
+ this.bigDecimal = bigDecimal;
+ this.stringBuilder = stringBuilder;
+ }
+
+ @SchemaCreate
+ public static StaticCreationSimplePojo of(
+ String str,
+ long aLong,
+ byte aByte,
+ short aShort,
+ int anInt,
+ boolean aBoolean,
+ DateTime dateTime,
+ ByteBuffer byteBuffer,
+ Instant instant,
+ byte[] bytes,
+ BigDecimal bigDecimal,
+ StringBuilder stringBuilder) {
+ return new StaticCreationSimplePojo(
+ str,
+ aByte,
+ aShort,
+ anInt,
+ aLong,
+ aBoolean,
+ dateTime,
+ instant,
+ bytes,
+ byteBuffer,
+ bigDecimal,
+ stringBuilder);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof StaticCreationSimplePojo)) {
+ return false;
+ }
+ StaticCreationSimplePojo that = (StaticCreationSimplePojo) o;
+ return aByte == that.aByte
+ && aShort == that.aShort
+ && anInt == that.anInt
+ && aLong == that.aLong
+ && aBoolean == that.aBoolean
+ && Objects.equals(str, that.str)
+ && Objects.equals(dateTime, that.dateTime)
+ && Objects.equals(instant, that.instant)
+ && Arrays.equals(bytes, that.bytes)
+ && Objects.equals(byteBuffer, that.byteBuffer)
+ && Objects.equals(bigDecimal, that.bigDecimal)
+ && Objects.equals(stringBuilder.toString(), that.stringBuilder.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ int result =
+ Objects.hash(
+ str,
+ aByte,
+ aShort,
+ anInt,
+ aLong,
+ aBoolean,
+ dateTime,
+ instant,
+ byteBuffer,
+ bigDecimal,
+ stringBuilder);
+ result = 31 * result + Arrays.hashCode(bytes);
+ return result;
+ }
+ }
+
+ /** A POJO for testing annotations. */
+ @DefaultSchema(JavaFieldSchema.class)
+ public static class AnnotatedSimplePojo {
+ public final String str;
+
+ @SchemaFieldName("aByte")
+ public final byte theByte;
+
+ @SchemaFieldName("aShort")
+ public final short theShort;
+
+ public final int anInt;
+ public final long aLong;
+ public final boolean aBoolean;
+ public final DateTime dateTime;
+ public final Instant instant;
+ public final byte[] bytes;
+ public final ByteBuffer byteBuffer;
+ public final BigDecimal bigDecimal;
+ public final StringBuilder stringBuilder;
+ @SchemaIgnore public final Integer pleaseIgnore;
+
+ // Marked with SchemaCreate, so this will be called to construct instances.
+ @SchemaCreate
+ public AnnotatedSimplePojo(
+ String str,
+ byte theByte,
+ long aLong,
+ short theShort,
+ int anInt,
+ boolean aBoolean,
+ DateTime dateTime,
+ BigDecimal bigDecimal,
+ Instant instant,
+ byte[] bytes,
+ ByteBuffer byteBuffer,
+ StringBuilder stringBuilder) {
+ this.str = str;
+ this.theByte = theByte;
+ this.theShort = theShort;
+ this.anInt = anInt;
+ this.aLong = aLong;
+ this.aBoolean = aBoolean;
+ this.dateTime = dateTime;
+ this.instant = instant;
+ this.bytes = bytes;
+ this.byteBuffer = byteBuffer;
+ this.bigDecimal = bigDecimal;
+ this.stringBuilder = stringBuilder;
+ this.pleaseIgnore = 42;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AnnotatedSimplePojo that = (AnnotatedSimplePojo) o;
+ return theByte == that.theByte
+ && theShort == that.theShort
+ && anInt == that.anInt
+ && aLong == that.aLong
+ && aBoolean == that.aBoolean
+ && Objects.equals(str, that.str)
+ && Objects.equals(dateTime, that.dateTime)
+ && Objects.equals(instant, that.instant)
+ && Arrays.equals(bytes, that.bytes)
+ && Objects.equals(byteBuffer, that.byteBuffer)
+ && Objects.equals(bigDecimal, that.bigDecimal)
+ && Objects.equals(stringBuilder.toString(), that.stringBuilder.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ int result =
+ Objects.hash(
+ str,
+ theByte,
+ theShort,
+ anInt,
+ aLong,
+ aBoolean,
+ dateTime,
+ instant,
+ byteBuffer,
+ bigDecimal,
+ stringBuilder.toString());
+ result = 31 * result + Arrays.hashCode(bytes);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "AnnotatedSimplePojo{"
+ + "str='"
+ + str
+ + '\''
+ + ", theByte="
+ + theByte
+ + ", theShort="
+ + theShort
+ + ", anInt="
+ + anInt
+ + ", aLong="
+ + aLong
+ + ", aBoolean="
+ + aBoolean
+ + ", dateTime="
+ + dateTime
+ + ", instant="
+ + instant
+ + ", bytes="
+ + Arrays.toString(bytes)
+ + ", byteBuffer="
+ + byteBuffer
+ + ", bigDecimal="
+ + bigDecimal
+ + ", stringBuilder="
+ + stringBuilder
+ + ", pleaseIgnore="
+ + pleaseIgnore
+ + '}';
+ }
+ }
+
/** A simple POJO containing basic types. * */
@DefaultSchema(JavaFieldSchema.class)
public static class SimplePOJO {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
index dd843a0..60dc9cd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
@@ -22,10 +22,10 @@ import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.List;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesSchema;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Customer.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Customer.java
index b471dc8..8aa1880 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Customer.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Customer.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.extensions.sql.example.model;
import java.io.Serializable;
import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
/** Describes a customer. */
@DefaultSchema(JavaBeanSchema.class)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Order.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Order.java
index 6564732..999b2b8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Order.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/model/Order.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.extensions.sql.example.model;
import java.io.Serializable;
import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
/** Describes an order. */
@DefaultSchema(JavaBeanSchema.class)
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredJavaBeanSqlTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredJavaBeanSqlTest.java
index 829469a..7fc3d98 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredJavaBeanSqlTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/InferredJavaBeanSqlTest.java
@@ -21,9 +21,9 @@ import static org.apache.beam.sdk.extensions.sql.TestUtils.tuple;
import java.io.Serializable;
import java.util.Objects;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
index 5104f11..fe87fc9 100644
--- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
+++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
@@ -23,10 +23,10 @@ import java.sql.ResultSet;
import java.util.Arrays;
import java.util.Objects;
import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
index 326154c..849c2f2 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.joda.time.Instant;
/** An auction submitted by a person. */
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
index 35092d7..d06e3bd 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
@@ -29,8 +29,8 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
/** Result of Query5. */
@DefaultSchema(JavaFieldSchema.class)
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
index 0a6c934..62e3386 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
@@ -29,8 +29,8 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
/** Result of Query2. */
@DefaultSchema(JavaFieldSchema.class)
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
index 18feea0..217bcff 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
@@ -32,8 +32,8 @@ import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.joda.time.Instant;
/** A bid for an item on auction. */
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
index 70f2285..6013d83 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
@@ -26,8 +26,8 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
/**
* An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, or a
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
index fe3a736..c985707 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
@@ -30,8 +30,8 @@ import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
/** Result of Query3. */
@DefaultSchema(JavaFieldSchema.class)
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
index c298e3f..2559f6b 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.schemas.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.joda.time.Instant;
/** A person either creating an auction or making a bid. */