You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/23 23:35:23 UTC
[beam] branch master updated: Merge pull request #21953 from Implement PubsubSchemaTransformMessageToRowFactory
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e94f33a49f3 Merge pull request #21953 from Implement PubsubSchemaTransformMessageToRowFactory
e94f33a49f3 is described below
commit e94f33a49f3b0b440bc8fbec3b1ce649a6c5832e
Author: Damon Douglas <da...@users.noreply.github.com>
AuthorDate: Thu Jun 23 16:35:17 2022 -0700
Merge pull request #21953 from Implement PubsubSchemaTransformMessageToRowFactory
* Implement PubsubSchemaTransformMessageToRowFactory
* Patch class code comment
* Patch adding serializer to build
* Ask needsSerializer before applying
* Patch nullness warning
* Patch spotless check
---
.../PubsubSchemaTransformMessageToRowFactory.java | 180 +++++++++++
...bsubSchemaTransformMessageToRowFactoryTest.java | 329 +++++++++++++++++++++
2 files changed, 509 insertions(+)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
new file mode 100644
index 00000000000..60f15fa1b59
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW;
+
+import com.google.common.base.Strings;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.compatqual.NonNullType;
+
+/**
+ * Builds a {@link PubsubMessageToRow} from a {@link PubsubSchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+class PubsubSchemaTransformMessageToRowFactory {
+ private static final String DEFAULT_FORMAT = "json";
+
+ private static final Schema.FieldType ATTRIBUTE_MAP_FIELD_TYPE =
+ Schema.FieldType.map(Schema.FieldType.STRING.withNullable(false), Schema.FieldType.STRING);
+ private static final Schema ATTRIBUTE_ARRAY_ENTRY_SCHEMA =
+ Schema.builder().addStringField("key").addStringField("value").build();
+ private static final Schema.FieldType ATTRIBUTE_ARRAY_FIELD_TYPE =
+ Schema.FieldType.array(Schema.FieldType.row(ATTRIBUTE_ARRAY_ENTRY_SCHEMA));
+
+ private static final String THRIFT_CLASS_KEY = "thriftClass";
+ private static final String THRIFT_PROTOCOL_FACTORY_CLASS_KEY = "thriftProtocolFactoryClass";
+ private static final String PROTO_CLASS_KEY = "protoClass";
+
+ /**
+ * Instantiate a {@link PubsubSchemaTransformMessageToRowFactory} from a {@link
+ * PubsubSchemaTransformReadConfiguration}.
+ */
+ static PubsubSchemaTransformMessageToRowFactory from(
+ @NonNullType PubsubSchemaTransformReadConfiguration configuration) {
+ return new PubsubSchemaTransformMessageToRowFactory(configuration);
+ }
+
+ /** Build the {@link PubsubMessageToRow}. */
+ PubsubMessageToRow buildMessageToRow() {
+ PubsubMessageToRow.Builder builder =
+ PubsubMessageToRow.builder()
+ .messageSchema(configuration.getDataSchema())
+ .useDlq(!Strings.isNullOrEmpty(configuration.getDeadLetterQueue()))
+ .useFlatSchema(!shouldUseNestedSchema());
+
+ if (needsSerializer()) {
+ builder = builder.serializerProvider(serializer());
+ }
+
+ return builder.build();
+ }
+
+ private final PubsubSchemaTransformReadConfiguration configuration;
+
+ private PubsubSchemaTransformMessageToRowFactory(
+ PubsubSchemaTransformReadConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ private PayloadSerializer payloadSerializer() {
+ Schema schema = configuration.getDataSchema();
+ String format = DEFAULT_FORMAT;
+
+ if (!Strings.isNullOrEmpty(configuration.getFormat())) {
+ format = configuration.getFormat();
+ }
+
+ ImmutableMap.Builder<String, Object> params = ImmutableMap.builder();
+
+ if (!Strings.isNullOrEmpty(configuration.getThriftClass())) {
+ params.put(THRIFT_CLASS_KEY, configuration.getThriftClass());
+ }
+
+ if (!Strings.isNullOrEmpty(configuration.getThriftProtocolFactoryClass())) {
+ params.put(THRIFT_PROTOCOL_FACTORY_CLASS_KEY, configuration.getThriftProtocolFactoryClass());
+ }
+
+ if (!Strings.isNullOrEmpty(configuration.getProtoClass())) {
+ params.put(PROTO_CLASS_KEY, configuration.getProtoClass());
+ }
+
+ return PayloadSerializers.getSerializer(format, schema, params.build());
+ }
+
+ PubsubMessageToRow.SerializerProvider serializer() {
+ return input -> payloadSerializer();
+ }
+
+ /**
+ * Determines whether the {@link PubsubMessageToRow} needs a {@link
+ * PubsubMessageToRow.SerializerProvider}.
+ *
+ * <p>The determination is based on {@link #shouldUseNestedSchema()} is false or if the {@link
+ * PubsubMessageToRow#PAYLOAD_FIELD} is not present.
+ */
+ boolean needsSerializer() {
+ return !shouldUseNestedSchema() || !fieldPresent(PAYLOAD_FIELD, Schema.FieldType.BYTES);
+ }
+
+ /**
+ * Determines whether a nested schema should be used for {@link
+ * PubsubSchemaTransformReadConfiguration#getDataSchema()}.
+ *
+ * <p>The determination is based on {@link #schemaHasValidPayloadField()} and {@link
+ * #schemaHasValidAttributesField()}}
+ */
+ boolean shouldUseNestedSchema() {
+ return schemaHasValidPayloadField() && schemaHasValidAttributesField();
+ }
+
+ /**
+ * Determines whether {@link PubsubSchemaTransformReadConfiguration#getDataSchema()} has a valid
+ * {@link PubsubMessageToRow#PAYLOAD_FIELD}.
+ */
+ boolean schemaHasValidPayloadField() {
+ Schema schema = configuration.getDataSchema();
+ if (!schema.hasField(PAYLOAD_FIELD)) {
+ return false;
+ }
+ if (fieldPresent(PAYLOAD_FIELD, Schema.FieldType.BYTES)) {
+ return true;
+ }
+ return schema.getField(PAYLOAD_FIELD).getType().getTypeName().equals(ROW);
+ }
+
+ /**
+ * Determines whether {@link PubsubSchemaTransformReadConfiguration#getDataSchema()} has a valid
+ * {@link PubsubMessageToRow#ATTRIBUTES_FIELD} field.
+ *
+ * <p>The determination is based on whether {@link #fieldPresent(String, Schema.FieldType)} for
+ * {@link PubsubMessageToRow#ATTRIBUTES_FIELD} is true for either {@link
+ * #ATTRIBUTE_MAP_FIELD_TYPE} or {@link #ATTRIBUTE_ARRAY_FIELD_TYPE} {@link Schema.FieldType}s.
+ */
+ boolean schemaHasValidAttributesField() {
+ return fieldPresent(ATTRIBUTES_FIELD, ATTRIBUTE_MAP_FIELD_TYPE)
+ || fieldPresent(ATTRIBUTES_FIELD, ATTRIBUTE_ARRAY_FIELD_TYPE);
+ }
+
+ /**
+ * Determines whether {@link PubsubSchemaTransformReadConfiguration#getDataSchema()} contains the
+ * field and whether that field is an expectedType {@link Schema.FieldType}.
+ */
+ boolean fieldPresent(String field, Schema.FieldType expectedType) {
+ Schema schema = configuration.getDataSchema();
+ return schema.hasField(field)
+ && expectedType.equivalent(
+ schema.getField(field).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
new file mode 100644
index 00000000000..5750a7c0301
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
+import static org.junit.Assert.*;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.*;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link PubsubSchemaTransformMessageToRowFactory}. */
+@RunWith(JUnit4.class)
+public class PubsubSchemaTransformMessageToRowFactoryTest {
+
+ List<TestCase> cases =
+ Arrays.asList(
+ testCase(PubsubSchemaTransformReadConfiguration.builder().setDataSchema(SCHEMA))
+ .expectPayloadSerializerProvider(JSON_PAYLOAD_SERIALIZER_PROVIDER)
+ .withSerializerInput(),
+ testCase(PubsubSchemaTransformReadConfiguration.builder().setDataSchema(SCHEMA))
+ .expectPubsubToRow(
+ PubsubMessageToRow.builder()
+ .messageSchema(SCHEMA)
+ .useFlatSchema(true)
+ .useDlq(false)),
+ testCase(
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setDataSchema(SCHEMA)
+ .setDeadLetterQueue("projects/project/topics/topic"))
+ .expectPubsubToRow(
+ PubsubMessageToRow.builder()
+ .messageSchema(SCHEMA)
+ .useFlatSchema(true)
+ .useDlq(true)),
+ testCase(
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setDataSchema(SCHEMA)
+ .setFormat("avro"))
+ .expectPayloadSerializerProvider(AVRO_PAYLOAD_SERIALIZER_PROVIDER)
+ .withSerializerInput(),
+ testCase(
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setDataSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY)))
+ .schemaShouldHaveValidAttributesField()
+ .fieldShouldBePresent(
+ ATTRIBUTES_FIELD_ARRAY.getName(), ATTRIBUTES_FIELD_ARRAY.getType()),
+ testCase(
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setDataSchema(Schema.of(ATTRIBUTES_FIELD_MAP)))
+ .schemaShouldHaveValidAttributesField()
+ .fieldShouldBePresent(ATTRIBUTES_FIELD_MAP.getName(), ATTRIBUTES_FIELD_MAP.getType()),
+ testCase(
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setDataSchema(Schema.of(ATTRIBUTES_FIELD_SHOULD_NOT_MATCH))),
+ testCase(
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setDataSchema(Schema.of(PAYLOAD_FIELD_SHOULD_NOT_MATCH))),
+ testCase(
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setDataSchema(Schema.of(PAYLOAD_FIELD_BYTES)))
+ .schemaShouldHaveValidPayloadField()
+ .fieldShouldBePresent(PAYLOAD_FIELD_BYTES.getName(), PAYLOAD_FIELD_BYTES.getType()),
+ testCase(
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setDataSchema(Schema.of(PAYLOAD_FIELD_ROW)))
+ .schemaShouldHaveValidPayloadField()
+ .fieldShouldBePresent(PAYLOAD_FIELD_ROW.getName(), PAYLOAD_FIELD_ROW.getType()),
+ testCase(
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setDataSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY, PAYLOAD_FIELD_BYTES)))
+ .schemaShouldHaveValidAttributesField()
+ .schemaShouldHaveValidPayloadField()
+ .shouldUseNestedSchema()
+ .shouldNotNeedSerializer()
+ .expectPubsubToRow(
+ PubsubMessageToRow.builder()
+ .messageSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY, PAYLOAD_FIELD_BYTES))
+ .useFlatSchema(false)
+ .useDlq(false)));
+
+ static final Schema.FieldType ATTRIBUTE_MAP_FIELD_TYPE =
+ Schema.FieldType.map(Schema.FieldType.STRING.withNullable(false), Schema.FieldType.STRING);
+ static final Schema ATTRIBUTE_ARRAY_ENTRY_SCHEMA =
+ Schema.builder().addStringField("key").addStringField("value").build();
+
+ static final Schema.FieldType ATTRIBUTE_ARRAY_FIELD_TYPE =
+ Schema.FieldType.array(Schema.FieldType.row(ATTRIBUTE_ARRAY_ENTRY_SCHEMA));
+
+ private static final Schema.Field ATTRIBUTES_FIELD_SHOULD_NOT_MATCH =
+ Schema.Field.of(ATTRIBUTES_FIELD, Schema.FieldType.STRING);
+
+ private static final Schema.Field ATTRIBUTES_FIELD_MAP =
+ Schema.Field.of(ATTRIBUTES_FIELD, ATTRIBUTE_MAP_FIELD_TYPE);
+
+ private static final Schema.Field ATTRIBUTES_FIELD_ARRAY =
+ Schema.Field.of(ATTRIBUTES_FIELD, ATTRIBUTE_ARRAY_FIELD_TYPE);
+
+ private static final Schema.Field PAYLOAD_FIELD_SHOULD_NOT_MATCH =
+ Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.STRING);
+
+ private static final Schema.Field PAYLOAD_FIELD_BYTES =
+ Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.BYTES);
+
+ private static final Schema.Field PAYLOAD_FIELD_ROW =
+ Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.row(Schema.of()));
+
+ private static final PayloadSerializerProvider JSON_PAYLOAD_SERIALIZER_PROVIDER =
+ new JsonPayloadSerializerProvider();
+
+ private static final AvroPayloadSerializerProvider AVRO_PAYLOAD_SERIALIZER_PROVIDER =
+ new AvroPayloadSerializerProvider();
+
+ private static final Schema SCHEMA =
+ Schema.of(
+ Schema.Field.of("name", Schema.FieldType.STRING),
+ Schema.Field.of("number", Schema.FieldType.INT64));
+
+ private static final Row ROW =
+ Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build();
+
+ @Test
+ public void testBuildMessageToRow() {
+ for (TestCase testCase : cases) {
+ if (testCase.expectPubsubToRow == null) continue;
+
+ PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+ PubsubMessageToRow expected = testCase.expectPubsubToRow;
+ PubsubMessageToRow actual = factory.buildMessageToRow();
+
+ assertEquals("messageSchema", expected.messageSchema(), actual.messageSchema());
+ assertEquals("useFlatSchema", expected.useFlatSchema(), actual.useFlatSchema());
+ assertEquals("useDlq", expected.useDlq(), actual.useDlq());
+ }
+ }
+
+ @Test
+ public void serializer() {
+ for (TestCase testCase : cases) {
+ PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+ if (testCase.expectPayloadSerializerProvider == null) continue;
+
+ Row serializerInput = Objects.requireNonNull(testCase.serializerInput);
+
+ byte[] expectedBytes =
+ testCase
+ .expectSerializerProvider()
+ .apply(testCase.dataSchema())
+ .serialize(serializerInput);
+
+ byte[] actualBytes =
+ factory.serializer().apply(testCase.dataSchema()).serialize(serializerInput);
+
+ String expected = new String(expectedBytes, StandardCharsets.UTF_8);
+ String actual = new String(actualBytes, StandardCharsets.UTF_8);
+
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void needsSerializer() {
+ for (TestCase testCase : cases) {
+ PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+ boolean expected = testCase.shouldNeedSerializer;
+ boolean actual = factory.needsSerializer();
+
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void shouldUseNestedSchema() {
+ for (TestCase testCase : cases) {
+ PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+ boolean expected = testCase.shouldUseNestedSchema;
+ boolean actual = factory.shouldUseNestedSchema();
+
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void schemaHasValidPayloadField() {
+ for (TestCase testCase : cases) {
+ PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+ boolean expected = testCase.shouldSchemaHaveValidPayloadField;
+ boolean actual = factory.schemaHasValidPayloadField();
+
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void schemaHasValidAttributesField() {
+ for (TestCase testCase : cases) {
+ PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+ boolean expected = testCase.shouldSchemaHaveValidAttributesField;
+ boolean actual = factory.schemaHasValidAttributesField();
+
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void fieldPresent() {
+ for (TestCase testCase : cases) {
+ PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+ for (Map.Entry<Pair<String, Schema.FieldType>, Boolean> entry :
+ testCase.shouldFieldPresent.entrySet()) {
+ Pair<String, Schema.FieldType> pair = entry.getKey();
+ boolean expected = entry.getValue();
+ boolean actual = factory.fieldPresent(pair.getKey(), pair.getValue());
+
+ assertEquals(expected, actual);
+ }
+ }
+ }
+
+ static TestCase testCase(PubsubSchemaTransformReadConfiguration.Builder configurationBuilder) {
+ return new TestCase(configurationBuilder);
+ }
+
+ private static class TestCase {
+ private final PubsubSchemaTransformReadConfiguration configuration;
+
+ private PubsubMessageToRow expectPubsubToRow;
+
+ private PayloadSerializerProvider expectPayloadSerializerProvider;
+
+ private boolean shouldUseNestedSchema = false;
+ private boolean shouldNeedSerializer = true;
+ private boolean shouldSchemaHaveValidPayloadField = false;
+ private boolean shouldSchemaHaveValidAttributesField = false;
+ private final Map<Pair<String, Schema.FieldType>, Boolean> shouldFieldPresent = new HashMap<>();
+
+ private Row serializerInput;
+
+ TestCase(PubsubSchemaTransformReadConfiguration.Builder configurationBuilder) {
+ this.configuration = configurationBuilder.build();
+ }
+
+ PubsubSchemaTransformMessageToRowFactory factory() {
+ return PubsubSchemaTransformMessageToRowFactory.from(configuration);
+ }
+
+ Schema dataSchema() {
+ return configuration.getDataSchema();
+ }
+
+ TestCase expectPubsubToRow(PubsubMessageToRow.Builder pubsubMessageToRowBuilder) {
+ this.expectPubsubToRow = pubsubMessageToRowBuilder.build();
+ return this;
+ }
+
+ TestCase withSerializerInput() {
+ this.serializerInput = PubsubSchemaTransformMessageToRowFactoryTest.ROW;
+ return this;
+ }
+
+ TestCase expectPayloadSerializerProvider(PayloadSerializerProvider value) {
+ this.expectPayloadSerializerProvider = value;
+ return this;
+ }
+
+ PubsubMessageToRow.SerializerProvider expectSerializerProvider() {
+ ImmutableMap.Builder<String, Object> params = ImmutableMap.builder();
+ PayloadSerializer payloadSerializer =
+ expectPayloadSerializerProvider.getSerializer(
+ configuration.getDataSchema(), params.build());
+
+ return (input -> payloadSerializer);
+ }
+
+ TestCase shouldUseNestedSchema() {
+ this.shouldUseNestedSchema = true;
+ return this;
+ }
+
+ TestCase shouldNotNeedSerializer() {
+ this.shouldNeedSerializer = false;
+ return this;
+ }
+
+ TestCase schemaShouldHaveValidPayloadField() {
+ this.shouldSchemaHaveValidPayloadField = true;
+ return this;
+ }
+
+ TestCase schemaShouldHaveValidAttributesField() {
+ this.shouldSchemaHaveValidAttributesField = true;
+ return this;
+ }
+
+ TestCase fieldShouldBePresent(String name, Schema.FieldType expectedType) {
+ this.shouldFieldPresent.put(Pair.of(name, expectedType), true);
+ return this;
+ }
+ }
+}