You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/23 23:35:23 UTC

[beam] branch master updated: Merge pull request #21953 from Implement PubsubSchemaTransformMessageToRowFactory

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e94f33a49f3 Merge pull request #21953 from Implement PubsubSchemaTransformMessageToRowFactory
e94f33a49f3 is described below

commit e94f33a49f3b0b440bc8fbec3b1ce649a6c5832e
Author: Damon Douglas <da...@users.noreply.github.com>
AuthorDate: Thu Jun 23 16:35:17 2022 -0700

    Merge pull request #21953 from Implement PubsubSchemaTransformMessageToRowFactory
    
    * Implement PubsubSchemaTransformMessageToRowFactory
    
    * Patch class code comment
    
    * Patch adding serializer to build
    
    * Ask needsSerializer before applying
    
    * Patch nullness warning
    
    * Patch spotless check
---
 .../PubsubSchemaTransformMessageToRowFactory.java  | 180 +++++++++++
 ...bsubSchemaTransformMessageToRowFactoryTest.java | 329 +++++++++++++++++++++
 2 files changed, 509 insertions(+)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
new file mode 100644
index 00000000000..60f15fa1b59
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW;
+
+import com.google.common.base.Strings;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.compatqual.NonNullType;
+
+/**
+ * Builds a {@link PubsubMessageToRow} from a {@link PubsubSchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+class PubsubSchemaTransformMessageToRowFactory {
+  private static final String DEFAULT_FORMAT = "json";
+
+  private static final Schema.FieldType ATTRIBUTE_MAP_FIELD_TYPE =
+      Schema.FieldType.map(Schema.FieldType.STRING.withNullable(false), Schema.FieldType.STRING);
+  private static final Schema ATTRIBUTE_ARRAY_ENTRY_SCHEMA =
+      Schema.builder().addStringField("key").addStringField("value").build();
+  private static final Schema.FieldType ATTRIBUTE_ARRAY_FIELD_TYPE =
+      Schema.FieldType.array(Schema.FieldType.row(ATTRIBUTE_ARRAY_ENTRY_SCHEMA));
+
+  private static final String THRIFT_CLASS_KEY = "thriftClass";
+  private static final String THRIFT_PROTOCOL_FACTORY_CLASS_KEY = "thriftProtocolFactoryClass";
+  private static final String PROTO_CLASS_KEY = "protoClass";
+
+  /**
+   * Instantiate a {@link PubsubSchemaTransformMessageToRowFactory} from a {@link
+   * PubsubSchemaTransformReadConfiguration}.
+   */
+  static PubsubSchemaTransformMessageToRowFactory from(
+      @NonNullType PubsubSchemaTransformReadConfiguration configuration) {
+    return new PubsubSchemaTransformMessageToRowFactory(configuration);
+  }
+
+  /** Build the {@link PubsubMessageToRow}. */
+  PubsubMessageToRow buildMessageToRow() {
+    PubsubMessageToRow.Builder builder =
+        PubsubMessageToRow.builder()
+            .messageSchema(configuration.getDataSchema())
+            .useDlq(!Strings.isNullOrEmpty(configuration.getDeadLetterQueue()))
+            .useFlatSchema(!shouldUseNestedSchema());
+
+    if (needsSerializer()) {
+      builder = builder.serializerProvider(serializer());
+    }
+
+    return builder.build();
+  }
+
+  private final PubsubSchemaTransformReadConfiguration configuration;
+
+  private PubsubSchemaTransformMessageToRowFactory(
+      PubsubSchemaTransformReadConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  private PayloadSerializer payloadSerializer() {
+    Schema schema = configuration.getDataSchema();
+    String format = DEFAULT_FORMAT;
+
+    if (!Strings.isNullOrEmpty(configuration.getFormat())) {
+      format = configuration.getFormat();
+    }
+
+    ImmutableMap.Builder<String, Object> params = ImmutableMap.builder();
+
+    if (!Strings.isNullOrEmpty(configuration.getThriftClass())) {
+      params.put(THRIFT_CLASS_KEY, configuration.getThriftClass());
+    }
+
+    if (!Strings.isNullOrEmpty(configuration.getThriftProtocolFactoryClass())) {
+      params.put(THRIFT_PROTOCOL_FACTORY_CLASS_KEY, configuration.getThriftProtocolFactoryClass());
+    }
+
+    if (!Strings.isNullOrEmpty(configuration.getProtoClass())) {
+      params.put(PROTO_CLASS_KEY, configuration.getProtoClass());
+    }
+
+    return PayloadSerializers.getSerializer(format, schema, params.build());
+  }
+
+  PubsubMessageToRow.SerializerProvider serializer() {
+    return input -> payloadSerializer();
+  }
+
+  /**
+   * Determines whether the {@link PubsubMessageToRow} needs a {@link
+   * PubsubMessageToRow.SerializerProvider}.
+   *
+   * <p>The determination is based on {@link #shouldUseNestedSchema()} is false or if the {@link
+   * PubsubMessageToRow#PAYLOAD_FIELD} is not present.
+   */
+  boolean needsSerializer() {
+    return !shouldUseNestedSchema() || !fieldPresent(PAYLOAD_FIELD, Schema.FieldType.BYTES);
+  }
+
+  /**
+   * Determines whether a nested schema should be used for {@link
+   * PubsubSchemaTransformReadConfiguration#getDataSchema()}.
+   *
+   * <p>The determination is based on {@link #schemaHasValidPayloadField()} and {@link
+   * #schemaHasValidAttributesField()}}
+   */
+  boolean shouldUseNestedSchema() {
+    return schemaHasValidPayloadField() && schemaHasValidAttributesField();
+  }
+
+  /**
+   * Determines whether {@link PubsubSchemaTransformReadConfiguration#getDataSchema()} has a valid
+   * {@link PubsubMessageToRow#PAYLOAD_FIELD}.
+   */
+  boolean schemaHasValidPayloadField() {
+    Schema schema = configuration.getDataSchema();
+    if (!schema.hasField(PAYLOAD_FIELD)) {
+      return false;
+    }
+    if (fieldPresent(PAYLOAD_FIELD, Schema.FieldType.BYTES)) {
+      return true;
+    }
+    return schema.getField(PAYLOAD_FIELD).getType().getTypeName().equals(ROW);
+  }
+
+  /**
+   * Determines whether {@link PubsubSchemaTransformReadConfiguration#getDataSchema()} has a valid
+   * {@link PubsubMessageToRow#ATTRIBUTES_FIELD} field.
+   *
+   * <p>The determination is based on whether {@link #fieldPresent(String, Schema.FieldType)} for
+   * {@link PubsubMessageToRow#ATTRIBUTES_FIELD} is true for either {@link
+   * #ATTRIBUTE_MAP_FIELD_TYPE} or {@link #ATTRIBUTE_ARRAY_FIELD_TYPE} {@link Schema.FieldType}s.
+   */
+  boolean schemaHasValidAttributesField() {
+    return fieldPresent(ATTRIBUTES_FIELD, ATTRIBUTE_MAP_FIELD_TYPE)
+        || fieldPresent(ATTRIBUTES_FIELD, ATTRIBUTE_ARRAY_FIELD_TYPE);
+  }
+
+  /**
+   * Determines whether {@link PubsubSchemaTransformReadConfiguration#getDataSchema()} contains the
+   * field and whether that field is an expectedType {@link Schema.FieldType}.
+   */
+  boolean fieldPresent(String field, Schema.FieldType expectedType) {
+    Schema schema = configuration.getDataSchema();
+    return schema.hasField(field)
+        && expectedType.equivalent(
+            schema.getField(field).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
new file mode 100644
index 00000000000..5750a7c0301
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
+import static org.junit.Assert.*;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.*;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link PubsubSchemaTransformMessageToRowFactory}. */
+@RunWith(JUnit4.class)
+public class PubsubSchemaTransformMessageToRowFactoryTest {
+
+  List<TestCase> cases =
+      Arrays.asList(
+          testCase(PubsubSchemaTransformReadConfiguration.builder().setDataSchema(SCHEMA))
+              .expectPayloadSerializerProvider(JSON_PAYLOAD_SERIALIZER_PROVIDER)
+              .withSerializerInput(),
+          testCase(PubsubSchemaTransformReadConfiguration.builder().setDataSchema(SCHEMA))
+              .expectPubsubToRow(
+                  PubsubMessageToRow.builder()
+                      .messageSchema(SCHEMA)
+                      .useFlatSchema(true)
+                      .useDlq(false)),
+          testCase(
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setDataSchema(SCHEMA)
+                      .setDeadLetterQueue("projects/project/topics/topic"))
+              .expectPubsubToRow(
+                  PubsubMessageToRow.builder()
+                      .messageSchema(SCHEMA)
+                      .useFlatSchema(true)
+                      .useDlq(true)),
+          testCase(
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setDataSchema(SCHEMA)
+                      .setFormat("avro"))
+              .expectPayloadSerializerProvider(AVRO_PAYLOAD_SERIALIZER_PROVIDER)
+              .withSerializerInput(),
+          testCase(
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setDataSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY)))
+              .schemaShouldHaveValidAttributesField()
+              .fieldShouldBePresent(
+                  ATTRIBUTES_FIELD_ARRAY.getName(), ATTRIBUTES_FIELD_ARRAY.getType()),
+          testCase(
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setDataSchema(Schema.of(ATTRIBUTES_FIELD_MAP)))
+              .schemaShouldHaveValidAttributesField()
+              .fieldShouldBePresent(ATTRIBUTES_FIELD_MAP.getName(), ATTRIBUTES_FIELD_MAP.getType()),
+          testCase(
+              PubsubSchemaTransformReadConfiguration.builder()
+                  .setDataSchema(Schema.of(ATTRIBUTES_FIELD_SHOULD_NOT_MATCH))),
+          testCase(
+              PubsubSchemaTransformReadConfiguration.builder()
+                  .setDataSchema(Schema.of(PAYLOAD_FIELD_SHOULD_NOT_MATCH))),
+          testCase(
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setDataSchema(Schema.of(PAYLOAD_FIELD_BYTES)))
+              .schemaShouldHaveValidPayloadField()
+              .fieldShouldBePresent(PAYLOAD_FIELD_BYTES.getName(), PAYLOAD_FIELD_BYTES.getType()),
+          testCase(
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setDataSchema(Schema.of(PAYLOAD_FIELD_ROW)))
+              .schemaShouldHaveValidPayloadField()
+              .fieldShouldBePresent(PAYLOAD_FIELD_ROW.getName(), PAYLOAD_FIELD_ROW.getType()),
+          testCase(
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setDataSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY, PAYLOAD_FIELD_BYTES)))
+              .schemaShouldHaveValidAttributesField()
+              .schemaShouldHaveValidPayloadField()
+              .shouldUseNestedSchema()
+              .shouldNotNeedSerializer()
+              .expectPubsubToRow(
+                  PubsubMessageToRow.builder()
+                      .messageSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY, PAYLOAD_FIELD_BYTES))
+                      .useFlatSchema(false)
+                      .useDlq(false)));
+
+  static final Schema.FieldType ATTRIBUTE_MAP_FIELD_TYPE =
+      Schema.FieldType.map(Schema.FieldType.STRING.withNullable(false), Schema.FieldType.STRING);
+  static final Schema ATTRIBUTE_ARRAY_ENTRY_SCHEMA =
+      Schema.builder().addStringField("key").addStringField("value").build();
+
+  static final Schema.FieldType ATTRIBUTE_ARRAY_FIELD_TYPE =
+      Schema.FieldType.array(Schema.FieldType.row(ATTRIBUTE_ARRAY_ENTRY_SCHEMA));
+
+  private static final Schema.Field ATTRIBUTES_FIELD_SHOULD_NOT_MATCH =
+      Schema.Field.of(ATTRIBUTES_FIELD, Schema.FieldType.STRING);
+
+  private static final Schema.Field ATTRIBUTES_FIELD_MAP =
+      Schema.Field.of(ATTRIBUTES_FIELD, ATTRIBUTE_MAP_FIELD_TYPE);
+
+  private static final Schema.Field ATTRIBUTES_FIELD_ARRAY =
+      Schema.Field.of(ATTRIBUTES_FIELD, ATTRIBUTE_ARRAY_FIELD_TYPE);
+
+  private static final Schema.Field PAYLOAD_FIELD_SHOULD_NOT_MATCH =
+      Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.STRING);
+
+  private static final Schema.Field PAYLOAD_FIELD_BYTES =
+      Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.BYTES);
+
+  private static final Schema.Field PAYLOAD_FIELD_ROW =
+      Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.row(Schema.of()));
+
+  private static final PayloadSerializerProvider JSON_PAYLOAD_SERIALIZER_PROVIDER =
+      new JsonPayloadSerializerProvider();
+
+  private static final AvroPayloadSerializerProvider AVRO_PAYLOAD_SERIALIZER_PROVIDER =
+      new AvroPayloadSerializerProvider();
+
+  private static final Schema SCHEMA =
+      Schema.of(
+          Schema.Field.of("name", Schema.FieldType.STRING),
+          Schema.Field.of("number", Schema.FieldType.INT64));
+
+  private static final Row ROW =
+      Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build();
+
+  @Test
+  public void testBuildMessageToRow() {
+    for (TestCase testCase : cases) {
+      if (testCase.expectPubsubToRow == null) continue;
+
+      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+      PubsubMessageToRow expected = testCase.expectPubsubToRow;
+      PubsubMessageToRow actual = factory.buildMessageToRow();
+
+      assertEquals("messageSchema", expected.messageSchema(), actual.messageSchema());
+      assertEquals("useFlatSchema", expected.useFlatSchema(), actual.useFlatSchema());
+      assertEquals("useDlq", expected.useDlq(), actual.useDlq());
+    }
+  }
+
+  @Test
+  public void serializer() {
+    for (TestCase testCase : cases) {
+      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+      if (testCase.expectPayloadSerializerProvider == null) continue;
+
+      Row serializerInput = Objects.requireNonNull(testCase.serializerInput);
+
+      byte[] expectedBytes =
+          testCase
+              .expectSerializerProvider()
+              .apply(testCase.dataSchema())
+              .serialize(serializerInput);
+
+      byte[] actualBytes =
+          factory.serializer().apply(testCase.dataSchema()).serialize(serializerInput);
+
+      String expected = new String(expectedBytes, StandardCharsets.UTF_8);
+      String actual = new String(actualBytes, StandardCharsets.UTF_8);
+
+      assertEquals(expected, actual);
+    }
+  }
+
+  @Test
+  public void needsSerializer() {
+    for (TestCase testCase : cases) {
+      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+      boolean expected = testCase.shouldNeedSerializer;
+      boolean actual = factory.needsSerializer();
+
+      assertEquals(expected, actual);
+    }
+  }
+
+  @Test
+  public void shouldUseNestedSchema() {
+    for (TestCase testCase : cases) {
+      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+      boolean expected = testCase.shouldUseNestedSchema;
+      boolean actual = factory.shouldUseNestedSchema();
+
+      assertEquals(expected, actual);
+    }
+  }
+
+  @Test
+  public void schemaHasValidPayloadField() {
+    for (TestCase testCase : cases) {
+      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+      boolean expected = testCase.shouldSchemaHaveValidPayloadField;
+      boolean actual = factory.schemaHasValidPayloadField();
+
+      assertEquals(expected, actual);
+    }
+  }
+
+  @Test
+  public void schemaHasValidAttributesField() {
+    for (TestCase testCase : cases) {
+      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+
+      boolean expected = testCase.shouldSchemaHaveValidAttributesField;
+      boolean actual = factory.schemaHasValidAttributesField();
+
+      assertEquals(expected, actual);
+    }
+  }
+
+  @Test
+  public void fieldPresent() {
+    for (TestCase testCase : cases) {
+      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
+      for (Map.Entry<Pair<String, Schema.FieldType>, Boolean> entry :
+          testCase.shouldFieldPresent.entrySet()) {
+        Pair<String, Schema.FieldType> pair = entry.getKey();
+        boolean expected = entry.getValue();
+        boolean actual = factory.fieldPresent(pair.getKey(), pair.getValue());
+
+        assertEquals(expected, actual);
+      }
+    }
+  }
+
+  static TestCase testCase(PubsubSchemaTransformReadConfiguration.Builder configurationBuilder) {
+    return new TestCase(configurationBuilder);
+  }
+
+  private static class TestCase {
+    private final PubsubSchemaTransformReadConfiguration configuration;
+
+    private PubsubMessageToRow expectPubsubToRow;
+
+    private PayloadSerializerProvider expectPayloadSerializerProvider;
+
+    private boolean shouldUseNestedSchema = false;
+    private boolean shouldNeedSerializer = true;
+    private boolean shouldSchemaHaveValidPayloadField = false;
+    private boolean shouldSchemaHaveValidAttributesField = false;
+    private final Map<Pair<String, Schema.FieldType>, Boolean> shouldFieldPresent = new HashMap<>();
+
+    private Row serializerInput;
+
+    TestCase(PubsubSchemaTransformReadConfiguration.Builder configurationBuilder) {
+      this.configuration = configurationBuilder.build();
+    }
+
+    PubsubSchemaTransformMessageToRowFactory factory() {
+      return PubsubSchemaTransformMessageToRowFactory.from(configuration);
+    }
+
+    Schema dataSchema() {
+      return configuration.getDataSchema();
+    }
+
+    TestCase expectPubsubToRow(PubsubMessageToRow.Builder pubsubMessageToRowBuilder) {
+      this.expectPubsubToRow = pubsubMessageToRowBuilder.build();
+      return this;
+    }
+
+    TestCase withSerializerInput() {
+      this.serializerInput = PubsubSchemaTransformMessageToRowFactoryTest.ROW;
+      return this;
+    }
+
+    TestCase expectPayloadSerializerProvider(PayloadSerializerProvider value) {
+      this.expectPayloadSerializerProvider = value;
+      return this;
+    }
+
+    PubsubMessageToRow.SerializerProvider expectSerializerProvider() {
+      ImmutableMap.Builder<String, Object> params = ImmutableMap.builder();
+      PayloadSerializer payloadSerializer =
+          expectPayloadSerializerProvider.getSerializer(
+              configuration.getDataSchema(), params.build());
+
+      return (input -> payloadSerializer);
+    }
+
+    TestCase shouldUseNestedSchema() {
+      this.shouldUseNestedSchema = true;
+      return this;
+    }
+
+    TestCase shouldNotNeedSerializer() {
+      this.shouldNeedSerializer = false;
+      return this;
+    }
+
+    TestCase schemaShouldHaveValidPayloadField() {
+      this.shouldSchemaHaveValidPayloadField = true;
+      return this;
+    }
+
+    TestCase schemaShouldHaveValidAttributesField() {
+      this.shouldSchemaHaveValidAttributesField = true;
+      return this;
+    }
+
+    TestCase fieldShouldBePresent(String name, Schema.FieldType expectedType) {
+      this.shouldFieldPresent.put(Pair.of(name, expectedType), true);
+      return this;
+    }
+  }
+}