You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/11/11 19:12:18 UTC

[beam] branch master updated: Implement PubsubRowToMessage transform (#23897)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fcd20c3712 Implement PubsubRowToMessage transform (#23897)
9fcd20c3712 is described below

commit 9fcd20c3712536f2d4580beead678cdbb6fd4746
Author: Damon <da...@users.noreply.github.com>
AuthorDate: Fri Nov 11 11:12:11 2022 -0800

    Implement PubsubRowToMessage transform (#23897)
    
    * Begin PubsubRowToMessage Impl
    
    * Complete working draft
    
    * Unit tests validate user and non-user fields
    
    * Finish tests on supporting methods
    
    * Pass checks before finalizing tests
    
    * WIP
    
    * fix timestamp
    
    * finalize tests
    
    * Finalize code comments
    
    * Clean up check findings
    
    * Add InputSchemaFactory
    
    * Patch code comment typo
---
 .../beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java |  577 +++++++++++
 .../sdk/io/gcp/pubsub/PubsubRowToMessageTest.java  | 1085 ++++++++++++++++++++
 2 files changed, 1662 insertions(+)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java
new file mode 100644
index 00000000000..9d7bbcf67c4
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDateTime;
+
+/** Write side {@link Row} to {@link PubsubMessage} converter. */
+@Internal
+@Experimental
+@AutoValue
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+abstract class PubsubRowToMessage extends PTransform<PCollection<Row>, PCollectionTuple> {
+
+  static Builder builder() {
+    return new AutoValue_PubsubRowToMessage.Builder();
+  }
+
+  static final TupleTag<PubsubMessage> OUTPUT = new TupleTag<PubsubMessage>() {};
+  static final TupleTag<Row> ERROR = new TupleTag<Row>() {};
+  static final String ERROR_DATA_FIELD_NAME = "data";
+  static final Field ERROR_MESSAGE_FIELD = Field.of("error_message", FieldType.STRING);
+  static final Field ERROR_STACK_TRACE_FIELD = Field.of("error_stack_trace", FieldType.STRING);
+
+  static final String DEFAULT_KEY_PREFIX = "$";
+  static final String ATTRIBUTES_KEY_NAME = "pubsub_attributes";
+  static final FieldType ATTRIBUTES_FIELD_TYPE = FieldType.map(FieldType.STRING, FieldType.STRING);
+
+  static final String EVENT_TIMESTAMP_KEY_NAME = "pubsub_event_timestamp";
+  static final FieldType EVENT_TIMESTAMP_FIELD_TYPE = FieldType.DATETIME;
+
+  static final String PAYLOAD_KEY_NAME = "pubsub_payload";
+  static final TypeName PAYLOAD_BYTES_TYPE_NAME = TypeName.BYTES;
+  static final TypeName PAYLOAD_ROW_TYPE_NAME = TypeName.ROW;
+
+  /** The prefix for all non-user fields. Defaults to {@link #DEFAULT_KEY_PREFIX}. */
+  abstract String getKeyPrefix();
+
+  /**
+   * The {@link PayloadSerializer} to {@link PayloadSerializer#serialize(Row)} the payload or user
+   * fields row.
+   */
+  @Nullable
+  abstract PayloadSerializer getPayloadSerializer();
+
+  /** The name of the attribute to apply to the {@link PubsubMessage}. */
+  @Nullable
+  abstract String getTargetTimestampAttributeName();
+
+  /**
+   * Use for testing, simplify assertions of generated timestamp when input lacks a timestamp field.
+   */
+  @Nullable
+  abstract Instant getMockInstant();
+
+  /** Generates {@link Schema} of the {@link #ERROR} {@link PCollection}. */
+  static Schema errorSchema(Schema inputSchema) {
+    Field dataField = Field.of(ERROR_DATA_FIELD_NAME, FieldType.row(inputSchema));
+    return Schema.of(dataField, ERROR_MESSAGE_FIELD, ERROR_STACK_TRACE_FIELD);
+  }
+
+  /**
+   * As a convenience method, generates {@link InputSchemaFactory} for expected {@link Schema} for
+   * {@link Row} input into {@link PubsubRowToMessage}, excluding {@link Field} for {@link
+   * #getPayloadKeyName()}. See {@link InputSchemaFactory#buildSchema(Field...)} for details on how
+   * to add additional fields.
+   */
+  InputSchemaFactory inputSchemaFactory() {
+    return inputSchemaFactory(null);
+  }
+
+  /**
+   * As a convenience method, generates {@link InputSchemaFactory} for expected {@link Schema} for
+   * {@link Row} input into {@link PubsubRowToMessage}. The {@link Field} for {@link
+   * #getPayloadKeyName()} is excluded for null {@param payloadFieldType}. See {@link
+   * InputSchemaFactory#buildSchema(Field...)} for details on how to add additional fields.
+   */
+  InputSchemaFactory inputSchemaFactory(@Nullable FieldType payloadFieldType) {
+    InputSchemaFactory.Builder builder =
+        InputSchemaFactory.builder()
+            .setAttributesField(Field.of(getAttributesKeyName(), ATTRIBUTES_FIELD_TYPE))
+            .setTimestampField(
+                Field.of(getSourceEventTimestampKeyName(), EVENT_TIMESTAMP_FIELD_TYPE));
+
+    if (payloadFieldType != null) {
+      builder = builder.setPayloadField(Field.of(getPayloadKeyName(), payloadFieldType));
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  public PCollectionTuple expand(PCollection<Row> input) {
+    Schema schema = input.getSchema();
+    validate(schema);
+    Schema errorSchema = errorSchema(schema);
+    PCollectionTuple pct =
+        input.apply(
+            PubsubRowToMessage.class.getSimpleName(),
+            ParDo.of(
+                    new PubsubRowToMessageDoFn(
+                        getAttributesKeyName(),
+                        getSourceEventTimestampKeyName(),
+                        getPayloadKeyName(),
+                        errorSchema,
+                        getTargetTimestampAttributeName(),
+                        getMockInstant(),
+                        getPayloadSerializer()))
+                .withOutputTags(OUTPUT, TupleTagList.of(ERROR)));
+
+    PCollection<PubsubMessage> output = pct.get(OUTPUT);
+    PCollection<Row> error = pct.get(ERROR).setRowSchema(errorSchema);
+    return PCollectionTuple.of(OUTPUT, output).and(ERROR, error);
+  }
+
+  /** Returns the name of the source attributes key, prefixed with {@link #getKeyPrefix()}. */
+  String getAttributesKeyName() {
+    return getKeyPrefix() + ATTRIBUTES_KEY_NAME;
+  }
+
+  /** Returns the name of the source timestamp key, prefixed with {@link #getKeyPrefix()}. */
+  String getSourceEventTimestampKeyName() {
+    return getKeyPrefix() + EVENT_TIMESTAMP_KEY_NAME;
+  }
+
+  /** Returns the name of the source payload key, prefixed with {@link #getKeyPrefix()}. */
+  String getPayloadKeyName() {
+    return getKeyPrefix() + PAYLOAD_KEY_NAME;
+  }
+
+  /** Validates an input's {@link Schema} for correctness. */
+  void validate(Schema schema) {
+
+    if (schema.getFieldCount() == 0) {
+      throw new IllegalArgumentException(
+          String.format("Schema must contain at least one field. Schema: %s", schema));
+    }
+
+    validateAttributesField(schema);
+    validateSourceEventTimeStampField(schema);
+    validateSerializableFields(schema);
+  }
+
+  /**
+   * Validates an input's {@link Schema} for its {@link #getAttributesKeyName()} field correctness,
+   * if exists.
+   */
+  void validateAttributesField(Schema schema) {
+    String attributesKeyName = getAttributesKeyName();
+    if (!schema.hasField(attributesKeyName)) {
+      return;
+    }
+    checkArgument(
+        SchemaReflection.of(schema)
+            .matchesAll(FieldMatcher.of(attributesKeyName, ATTRIBUTES_FIELD_TYPE)));
+  }
+
+  /**
+   * Validates an input's {@link Schema} for its {@link #getSourceEventTimestampKeyName()} ()} field
+   * correctness, if exists.
+   */
+  void validateSourceEventTimeStampField(Schema schema) {
+    String eventTimestampKeyName = getSourceEventTimestampKeyName();
+    if (!schema.hasField(eventTimestampKeyName)) {
+      return;
+    }
+    checkArgument(
+        SchemaReflection.of(schema)
+            .matchesAll(FieldMatcher.of(eventTimestampKeyName, EVENT_TIMESTAMP_FIELD_TYPE)));
+  }
+
+  /**
+   * Validates an input's {@link Schema} for either {@link #getPayloadKeyName()} or user fields
+   * correctness. Additionally, it validates the {@link #getPayloadSerializer()} null state based on
+   * the {@link #getPayloadKeyName()} {@link FieldType} or the presence of user fields.
+   */
+  void validateSerializableFields(Schema schema) {
+    String attributesKeyName = getAttributesKeyName();
+    String eventTimestampKeyName = getSourceEventTimestampKeyName();
+    String payloadKeyName = getPayloadKeyName();
+    Schema withUserFieldsOnly =
+        removeFields(schema, attributesKeyName, eventTimestampKeyName, payloadKeyName);
+    boolean hasUserFields = withUserFieldsOnly.getFieldCount() > 0;
+    String withUserFieldsList = String.join(", ", withUserFieldsOnly.getFieldNames());
+    SchemaReflection schemaReflection = SchemaReflection.of(schema);
+    boolean hasPayloadField = schemaReflection.matchesAll(FieldMatcher.of(payloadKeyName));
+    boolean hasPayloadRowField =
+        schemaReflection.matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_ROW_TYPE_NAME));
+    boolean hasPayloadBytesField =
+        schemaReflection.matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_BYTES_TYPE_NAME));
+    boolean hasBothUserFieldsAndPayloadField = hasUserFields && hasPayloadField;
+
+    checkArgument(
+        hasUserFields || hasPayloadField,
+        String.format(
+            "schema must have either a %s field or user fields i.e. not %s, %s or %s",
+            payloadKeyName, attributesKeyName, eventTimestampKeyName, payloadKeyName));
+
+    checkArgument(
+        !hasBothUserFieldsAndPayloadField,
+        String.format(
+            "schema field: %s incompatible with %s fields", payloadKeyName, withUserFieldsList));
+
+    if (hasPayloadBytesField) {
+      checkArgument(
+          getPayloadSerializer() == null,
+          String.format(
+              "schema field: %s of type: %s with a %s is incompatible",
+              payloadKeyName, PAYLOAD_BYTES_TYPE_NAME, PayloadSerializer.class.getName()));
+    }
+
+    if (hasPayloadRowField) {
+      checkArgument(
+          getPayloadSerializer() != null,
+          String.format(
+              "schema field: %s of type: %s requires a %s",
+              payloadKeyName, PAYLOAD_ROW_TYPE_NAME, PayloadSerializer.class.getName()));
+    }
+
+    if (hasUserFields) {
+      checkArgument(
+          getPayloadSerializer() != null,
+          String.format(
+              "specifying schema fields: %s requires a %s",
+              withUserFieldsList, PayloadSerializer.class.getName()));
+    }
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract Builder setKeyPrefix(String value);
+
+    abstract Optional<String> getKeyPrefix();
+
+    abstract Builder setPayloadSerializer(PayloadSerializer value);
+
+    abstract Builder setTargetTimestampAttributeName(String value);
+
+    abstract Builder setMockInstant(Instant value);
+
+    abstract PubsubRowToMessage autoBuild();
+
+    final PubsubRowToMessage build() {
+      if (!getKeyPrefix().isPresent()) {
+        setKeyPrefix(DEFAULT_KEY_PREFIX);
+      }
+      return autoBuild();
+    }
+  }
+
+  /** {@link SchemaReflection} is a helper class for reflecting fields of a {@link Schema}. */
+  static class SchemaReflection {
+    static SchemaReflection of(Schema schema) {
+      return new SchemaReflection(schema);
+    }
+
+    private final Schema schema;
+
+    private SchemaReflection(Schema schema) {
+      this.schema = schema;
+    }
+
+    /** Returns true of all {@param fieldMatchers} {@link FieldMatcher#match(Schema)}. */
+    boolean matchesAll(FieldMatcher... fieldMatchers) {
+      for (FieldMatcher fieldMatcher : fieldMatchers) {
+        if (!fieldMatcher.match(schema)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /** {@link FieldMatcher} matches fields in a {@link Schema}. */
+  static class FieldMatcher {
+    static FieldMatcher of(String name) {
+      return new FieldMatcher(name);
+    }
+
+    static FieldMatcher of(String name, TypeName typeName) {
+      return new FieldMatcher(name, typeName);
+    }
+
+    static FieldMatcher of(String name, FieldType fieldType) {
+      return new FieldMatcher(name, fieldType);
+    }
+
+    private final String name;
+
+    @Nullable private final TypeName typeName;
+
+    @Nullable private final FieldType fieldType;
+
+    private FieldMatcher(String name, @Nullable TypeName typeName, @Nullable FieldType fieldType) {
+      this.name = name;
+      this.typeName = typeName;
+      this.fieldType = fieldType;
+    }
+
+    private FieldMatcher(String name) {
+      this(name, null, null);
+    }
+
+    private FieldMatcher(String name, TypeName typeName) {
+      this(name, typeName, null);
+    }
+
+    private FieldMatcher(String name, FieldType fieldType) {
+      this(name, null, fieldType);
+    }
+
+    /** Returns true when the {@link Field} in a {@link Schema} matches its search criteria. */
+    boolean match(Schema schema) {
+      if (!schema.hasField(name)) {
+        return false;
+      }
+      if (typeName == null && fieldType == null) {
+        return true;
+      }
+      Field field = schema.getField(name);
+      if (typeName != null) {
+        return field.getType().getTypeName().equals(typeName);
+      }
+      return fieldType.equals(field.getType());
+    }
+  }
+
+  /** Removes fields from a {@link Schema}. */
+  static Schema removeFields(Schema schema, String... fields) {
+    List<String> exclude = Arrays.stream(fields).collect(Collectors.toList());
+    Schema.Builder builder = Schema.builder();
+    for (Field field : schema.getFields()) {
+      if (exclude.contains(field.getName())) {
+        continue;
+      }
+      builder.addField(field);
+    }
+    return builder.build();
+  }
+
+  /** A {@link DoFn} that converts a {@link Row} to a {@link PubsubMessage}. */
+  static class PubsubRowToMessageDoFn extends DoFn<Row, PubsubMessage> {
+
+    private final String attributesKeyName;
+    private final String sourceTimestampKeyName;
+    private final String payloadKeyName;
+    private final Schema errorSchema;
+
+    @Nullable private final String targetTimestampKeyName;
+    @Nullable private final PayloadSerializer payloadSerializer;
+
+    @Nullable private final Instant mockInstant;
+
+    PubsubRowToMessageDoFn(
+        String attributesKeyName,
+        String sourceTimestampKeyName,
+        String payloadKeyName,
+        Schema errorSchema,
+        @Nullable String targetTimestampKeyName,
+        @Nullable Instant mockInstant,
+        @Nullable PayloadSerializer payloadSerializer) {
+      this.attributesKeyName = attributesKeyName;
+      this.sourceTimestampKeyName = sourceTimestampKeyName;
+      this.payloadKeyName = payloadKeyName;
+      this.errorSchema = errorSchema;
+      this.targetTimestampKeyName = targetTimestampKeyName;
+      this.payloadSerializer = payloadSerializer;
+      this.mockInstant = mockInstant;
+    }
+
+    @ProcessElement
+    public void process(@Element Row row, MultiOutputReceiver receiver) {
+      try {
+
+        Map<String, String> attributesWithoutTimestamp = this.attributesWithoutTimestamp(row);
+        String timestampAsString = this.timestampAsString(row);
+        String timestampKeyName = sourceTimestampKeyName;
+        if (targetTimestampKeyName != null) {
+          timestampKeyName = targetTimestampKeyName;
+        }
+        byte[] payload = this.payload(row);
+        HashMap<String, String> attributes = new HashMap<>(attributesWithoutTimestamp);
+        attributes.put(timestampKeyName, timestampAsString);
+        PubsubMessage message = new PubsubMessage(payload, attributes);
+        receiver.get(OUTPUT).output(message);
+
+      } catch (Exception e) {
+
+        String message = e.getMessage();
+        String stackTrace = Throwables.getStackTraceAsString(e);
+        Row error =
+            Row.withSchema(errorSchema)
+                .withFieldValue(ERROR_DATA_FIELD_NAME, row)
+                .withFieldValue(ERROR_MESSAGE_FIELD.getName(), message)
+                .withFieldValue(ERROR_STACK_TRACE_FIELD.getName(), stackTrace)
+                .build();
+
+        receiver.get(ERROR).output(error);
+      }
+    }
+
+    /**
+     * Extracts the {@code Map<String, String>} attributes from a {@link Row} that contains the
+     * {@link #attributesKeyName}.
+     */
+    Map<String, String> attributesWithoutTimestamp(Row row) {
+      if (!row.getSchema().hasField(attributesKeyName)) {
+        return new HashMap<>();
+      }
+      return row.getMap(attributesKeyName);
+    }
+
+    /**
+     * Outputs the {@link #timestamp(Row)} as a String in RFC 3339 format. For example, {@code
+     * 2015-10-29T23:41:41.123Z}.
+     */
+    String timestampAsString(Row row) {
+      return timestamp(row).toString();
+    }
+
+    /**
+     * Extracts a {@link ReadableDateTime} from a {@link Row} containing a {@link
+     * #sourceTimestampKeyName}. If the {@link Field} is missing, returns an {@link Instant#now()}.
+     */
+    ReadableDateTime timestamp(Row row) {
+      if (row.getSchema().hasField(sourceTimestampKeyName)) {
+        return row.getDateTime(sourceTimestampKeyName);
+      }
+      Instant instant = Instant.now();
+      if (mockInstant != null) {
+        instant = mockInstant;
+      }
+      return new DateTime(instant).withZone(instant.getZone());
+    }
+
+    /**
+     * Extracts a {@code byte[]} payload from a {@link Row}, from either of the following mutually
+     * exclusive sources. <br>
+     * - {@link #payloadKeyName} {@link Field} with {@link FieldType#BYTES} <br>
+     * - serialized {@link #payloadKeyName} {@link Field} with {@link TypeName#ROW} using the {@link
+     * #payloadSerializer} <br>
+     * - serialized user fields provided that are not {@link #attributesKeyName} and {@link
+     * #sourceTimestampKeyName}
+     */
+    byte[] payload(Row row) {
+      if (SchemaReflection.of(row.getSchema())
+          .matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_BYTES_TYPE_NAME))) {
+        return row.getBytes(payloadKeyName);
+      }
+      return Objects.requireNonNull(payloadSerializer).serialize(serializableRow(row));
+    }
+
+    /**
+     * Extracts the serializable part of a {@link Row} from the following mutually exclusive
+     * sources. <br>
+     * - serialized {@link #payloadKeyName} {@link Field} with {@link TypeName#ROW} using the {@link
+     * #payloadSerializer} <br>
+     * - serialized user fields provided that are not {@link #attributesKeyName} and {@link
+     * #sourceTimestampKeyName}
+     */
+    Row serializableRow(Row row) {
+      SchemaReflection schemaReflection = SchemaReflection.of(row.getSchema());
+
+      if (schemaReflection.matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_BYTES_TYPE_NAME))) {
+        throw new IllegalArgumentException(
+            String.format(
+                "serializable Row does not exist for payload of type: %s",
+                PAYLOAD_BYTES_TYPE_NAME));
+      }
+
+      if (schemaReflection.matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_ROW_TYPE_NAME))) {
+        return row.getRow(payloadKeyName);
+      }
+      Schema withUserFieldsOnly =
+          removeFields(row.getSchema(), attributesKeyName, sourceTimestampKeyName);
+      Map<String, Object> values = new HashMap<>();
+      for (String name : withUserFieldsOnly.getFieldNames()) {
+        values.put(name, row.getValue(name));
+      }
+      return Row.withSchema(withUserFieldsOnly).withFieldValues(values).build();
+    }
+  }
+
+  /**
+   * A convenience class for generating the expected {@link Schema} for {@link Row} input of {@link
+   * PubsubRowToMessage}.
+   */
+  @AutoValue
+  abstract static class InputSchemaFactory {
+    static Builder builder() {
+      return new AutoValue_PubsubRowToMessage_InputSchemaFactory.Builder();
+    }
+
+    abstract Field getAttributesField();
+
+    abstract Field getTimestampField();
+
+    @Nullable
+    abstract Field getPayloadField();
+
+    /**
+     * Builds a {@link Schema} from {@link #getAttributesField()} and {@link #getTimestampField()}
+     * and {@param additionalFields}. Users are encouraged to use the {@link #removeFields(Schema,
+     * String...)} method to customize the resulting {@link Schema}.
+     */
+    Schema buildSchema(Field... additionalFields) {
+      Schema.Builder builder =
+          Schema.builder().addField(getAttributesField()).addField(getTimestampField());
+
+      if (getPayloadField() != null) {
+        builder = builder.addField(getPayloadField());
+      }
+
+      for (Field field : additionalFields) {
+        builder = builder.addField(field);
+      }
+
+      return builder.build();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setAttributesField(Field value);
+
+      abstract Builder setTimestampField(Field value);
+
+      abstract Builder setPayloadField(Field value);
+
+      abstract InputSchemaFactory build();
+    }
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessageTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessageTest.java
new file mode 100644
index 00000000000..5ef8bc473e9
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessageTest.java
@@ -0,0 +1,1085 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_KEY_NAME;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_KEY_PREFIX;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ERROR;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_KEY_NAME;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.OUTPUT;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PAYLOAD_KEY_NAME;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.errorSchema;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.removeFields;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.FieldMatcher;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PubsubRowToMessageDoFn;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.SchemaReflection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.io.payloads.AvroPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PubsubRowToMessage}. */
+@RunWith(JUnit4.class)
+public class PubsubRowToMessageTest {
+
+  private static final PipelineOptions PIPELINE_OPTIONS = PipelineOptionsFactory.create();
+  private static final String DEFAULT_ATTRIBUTES_KEY_NAME =
+      DEFAULT_KEY_PREFIX + ATTRIBUTES_KEY_NAME;
+  private static final String DEFAULT_EVENT_TIMESTAMP_KEY_NAME =
+      DEFAULT_KEY_PREFIX + EVENT_TIMESTAMP_KEY_NAME;
+  private static final String DEFAULT_PAYLOAD_KEY_NAME = DEFAULT_KEY_PREFIX + PAYLOAD_KEY_NAME;
+  private static final Field BOOLEAN_FIELD = Field.of("boolean", FieldType.BOOLEAN);
+  private static final Field BYTE_FIELD = Field.of("byte", FieldType.BYTE);
+  private static final Field DATETIME_FIELD = Field.of("datetime", FieldType.DATETIME);
+  private static final Field DECIMAL_FIELD = Field.of("decimal", FieldType.DECIMAL);
+  private static final Field DOUBLE_FIELD = Field.of("double", FieldType.DOUBLE);
+  private static final Field FLOAT_FIELD = Field.of("float", FieldType.FLOAT);
+  private static final Field INT16_FIELD = Field.of("int16", FieldType.INT16);
+  private static final Field INT32_FIELD = Field.of("int32", FieldType.INT32);
+  private static final Field INT64_FIELD = Field.of("int64", FieldType.INT64);
+  private static final Field STRING_FIELD = Field.of("string", FieldType.STRING);
+  private static final Schema ALL_DATA_TYPES_SCHEMA =
+      Schema.of(
+          BOOLEAN_FIELD,
+          BYTE_FIELD,
+          DATETIME_FIELD,
+          DECIMAL_FIELD,
+          DOUBLE_FIELD,
+          FLOAT_FIELD,
+          INT16_FIELD,
+          INT32_FIELD,
+          INT64_FIELD,
+          STRING_FIELD);
+
+  private static final Schema NON_USER_WITH_BYTES_PAYLOAD =
+      Schema.of(
+          Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+          Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE),
+          Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES));
+
+  private static final Schema NON_USER_WITH_ROW_PAYLOAD =
+      Schema.of(
+          Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+          Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE),
+          Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA)));
+
+  private static final Schema NON_USER_WITHOUT_PAYLOAD =
+      Schema.of(
+          Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+          Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE));
+
+  static {
+    PIPELINE_OPTIONS.setStableUniqueNames(CheckEnabled.OFF);
+  }
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Rule
+  public TestPipeline errorsTestPipeline =
+      TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExpandThrowsExceptions() {
+    PayloadSerializer jsonPayloadSerializer =
+        new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+    Instant embeddedTimestamp = Instant.now();
+    Instant mockTimestamp = Instant.ofEpochMilli(embeddedTimestamp.getMillis() + 1000000L);
+
+    Field attributesField = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE);
+    Schema withAttributesSchema = Schema.of(attributesField);
+    Row withAttributes =
+        Row.withSchema(withAttributesSchema).attachValues(ImmutableMap.of("aaa", "111"));
+
+    Field timestampField = Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE);
+    Schema withTimestampSchema = Schema.of(timestampField);
+    Row withEmbeddedTimestamp = Row.withSchema(withTimestampSchema).attachValues(embeddedTimestamp);
+
+    Field payloadBytesField = Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES);
+    Schema withPayloadBytesSchema = Schema.of(payloadBytesField);
+    Row withPayloadBytes =
+        Row.withSchema(withPayloadBytesSchema).attachValues(new byte[] {1, 2, 3, 4});
+
+    Row withAllDataTypes =
+        rowWithAllDataTypes(
+            true,
+            (byte) 0,
+            Instant.now().toDateTime(),
+            BigDecimal.valueOf(1L),
+            3.12345,
+            4.1f,
+            (short) 5,
+            2,
+            7L,
+            "asdfjkl;");
+
+    Field payloadRowField =
+        Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA));
+    Schema withPayloadRowSchema = Schema.of(payloadRowField);
+    Row withPayloadRow = Row.withSchema(withPayloadRowSchema).attachValues(withAllDataTypes);
+
+    PubsubRowToMessage transformWithoutSerializer =
+        PubsubRowToMessage.builder().setMockInstant(mockTimestamp).build();
+
+    PubsubRowToMessage transformWithSerializer =
+        PubsubRowToMessage.builder().setPayloadSerializer(jsonPayloadSerializer).build();
+
+    // requires either payload or user fields
+    errorsTestPipeline
+        .apply(Create.of(withAttributes))
+        .setRowSchema(withAttributesSchema)
+        .apply(transformWithoutSerializer);
+
+    // requires either payload or user fields
+    errorsTestPipeline
+        .apply(Create.of(withEmbeddedTimestamp))
+        .setRowSchema(withTimestampSchema)
+        .apply(transformWithoutSerializer);
+
+    // payload bytes incompatible with non-null payload serializer
+    errorsTestPipeline
+        .apply(Create.of(withPayloadBytes))
+        .setRowSchema(withPayloadBytesSchema)
+        .apply(transformWithSerializer);
+
+    // payload row requires payload serializer
+    errorsTestPipeline
+        .apply(Create.of(withPayloadRow))
+        .setRowSchema(withPayloadRowSchema)
+        .apply(transformWithoutSerializer);
+
+    // user fields row requires payload serializer
+    errorsTestPipeline
+        .apply(Create.of(withAllDataTypes))
+        .setRowSchema(ALL_DATA_TYPES_SCHEMA)
+        .apply(transformWithoutSerializer);
+
+    // input with both payload row and user fields incompatible
+    errorsTestPipeline
+        .apply(Create.of(merge(withPayloadRow, withAllDataTypes)))
+        .setRowSchema(merge(withPayloadRowSchema, ALL_DATA_TYPES_SCHEMA))
+        .apply(transformWithSerializer);
+  }
+
+  @Test
+  public void testExpandError() {
+    PayloadSerializer mismatchedSerializerProviderSchema =
+        new JsonPayloadSerializerProvider()
+            .getSerializer(Schema.of(STRING_FIELD), ImmutableMap.of());
+
+    Row withAllDataTypes =
+        rowWithAllDataTypes(
+            true,
+            (byte) 0,
+            Instant.now().toDateTime(),
+            BigDecimal.valueOf(1L),
+            3.12345,
+            4.1f,
+            (short) 5,
+            2,
+            7L,
+            "asdfjkl;");
+
+    PubsubRowToMessage transformWithSerializer =
+        PubsubRowToMessage.builder()
+            .setPayloadSerializer(mismatchedSerializerProviderSchema)
+            .build();
+
+    PAssert.that(
+            pipeline
+                .apply(Create.of(withAllDataTypes))
+                .setRowSchema(ALL_DATA_TYPES_SCHEMA)
+                .apply(transformWithSerializer)
+                .get(ERROR)
+                .apply(Count.globally()))
+        .containsInAnyOrder(1L);
+
+    pipeline.run(PIPELINE_OPTIONS);
+  }
+
+  @Test
+  public void testExpandOutput() {
+    PayloadSerializer jsonPayloadSerializer =
+        new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+    Instant embeddedTimestamp = Instant.now();
+    Instant mockTimestamp = Instant.ofEpochMilli(embeddedTimestamp.getMillis() + 1000000L);
+    String embeddedTimestampString = embeddedTimestamp.toString();
+    String mockTimestampString = mockTimestamp.toString();
+
+    byte[] bytes =
+        "All the 🌎's a 🎦, and all the 🚹 and 🚺 merely players. They 🈷️their 🚪and their 🎟️and 1️⃣🚹in his time ▶️many🤺🪂🏆"
+            .getBytes(StandardCharsets.UTF_8);
+
+    Field attributesField = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE);
+    Field timestampField = Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE);
+    Field payloadBytesField = Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES);
+    Field payloadRowField =
+        Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA));
+
+    Row withAllDataTypes =
+        rowWithAllDataTypes(
+            true,
+            (byte) 0,
+            Instant.now().toDateTime(),
+            BigDecimal.valueOf(1L),
+            3.12345,
+            4.1f,
+            (short) 5,
+            2,
+            7L,
+            "asdfjkl;");
+
+    byte[] serializedWithAllDataTypes = jsonPayloadSerializer.serialize(withAllDataTypes);
+
+    Schema withAttributesSchema = Schema.of(attributesField);
+    Row withAttributes =
+        Row.withSchema(withAttributesSchema).attachValues(ImmutableMap.of("aaa", "111"));
+
+    Schema withTimestampSchema = Schema.of(timestampField);
+    Row withEmbeddedTimestamp = Row.withSchema(withTimestampSchema).attachValues(embeddedTimestamp);
+
+    Schema withPayloadBytesSchema = Schema.of(payloadBytesField);
+    Row withPayloadBytes = Row.withSchema(withPayloadBytesSchema).attachValues(bytes);
+
+    Schema withPayloadRowSchema = Schema.of(payloadRowField);
+    Row withPayloadRow = Row.withSchema(withPayloadRowSchema).attachValues(withAllDataTypes);
+
+    PubsubRowToMessage transformWithoutSerializer =
+        PubsubRowToMessage.builder().setMockInstant(mockTimestamp).build();
+
+    PubsubRowToMessage transformWithSerializer =
+        PubsubRowToMessage.builder()
+            .setMockInstant(mockTimestamp)
+            .setPayloadSerializer(jsonPayloadSerializer)
+            .build();
+
+    PCollection<Row> a0t0p0u1 =
+        pipeline.apply(Create.of(withAllDataTypes)).setRowSchema(ALL_DATA_TYPES_SCHEMA);
+    PubsubMessage a0t0p0u1Message =
+        new PubsubMessage(
+            serializedWithAllDataTypes,
+            ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString));
+    PAssert.that(
+            "expected: payload: user serialized row, attributes: ParDo mocked generated timestamp only",
+            a0t0p0u1.apply(transformWithSerializer).get(OUTPUT))
+        .containsInAnyOrder(a0t0p0u1Message);
+
+    PCollection<Row> a1t0p0u1 =
+        pipeline
+            .apply(Create.of(merge(withAttributes, withAllDataTypes)))
+            .setRowSchema(merge(withAttributesSchema, ALL_DATA_TYPES_SCHEMA));
+    PubsubMessage a1t0p0u1Message =
+        new PubsubMessage(
+            serializedWithAllDataTypes,
+            ImmutableMap.of("aaa", "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString));
+    PAssert.that(
+            "expected: payload: user serialized row, attributes: input attributes and ParDo generated timestamp",
+            a1t0p0u1.apply(transformWithSerializer).get(OUTPUT))
+        .containsInAnyOrder(a1t0p0u1Message);
+
+    PCollection<Row> a0t1p0u1 =
+        pipeline
+            .apply(Create.of(merge(withEmbeddedTimestamp, withAllDataTypes)))
+            .setRowSchema(merge(withTimestampSchema, ALL_DATA_TYPES_SCHEMA));
+    PubsubMessage a0t1p0u1Message =
+        new PubsubMessage(
+            serializedWithAllDataTypes,
+            ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, embeddedTimestampString));
+    PAssert.that(
+            "expected: payload: user serialized row, attributes: timestamp from row input",
+            a0t1p0u1.apply(transformWithSerializer).get(OUTPUT))
+        .containsInAnyOrder(a0t1p0u1Message);
+
+    PCollection<Row> a0t0p1bu0 =
+        pipeline.apply(Create.of(withPayloadBytes)).setRowSchema(withPayloadBytesSchema);
+    PubsubMessage a0t0p1bu0Message =
+        new PubsubMessage(
+            bytes, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString));
+    PAssert.that(
+            "expected: payload: raw bytes, attributes: ParDo generated timestamp",
+            a0t0p1bu0.apply(transformWithoutSerializer).get(OUTPUT))
+        .containsInAnyOrder(a0t0p1bu0Message);
+
+    PCollection<Row> a0t0p1ru0 =
+        pipeline.apply(Create.of(withPayloadRow)).setRowSchema(withPayloadRowSchema);
+    PubsubMessage a0t0p1ru0Message =
+        new PubsubMessage(
+            serializedWithAllDataTypes,
+            ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString));
+    PAssert.that(
+            "expected: payload: non-user row, attributes: ParDo generated timestamp",
+            a0t0p1ru0.apply(transformWithSerializer).get(OUTPUT))
+        .containsInAnyOrder(a0t0p1ru0Message);
+
+    PCollection<Row> a1t0p1bu0 =
+        pipeline
+            .apply(Create.of(merge(withAttributes, withPayloadBytes)))
+            .setRowSchema(merge(withAttributesSchema, withPayloadBytesSchema));
+    PubsubMessage a1t0p1bu0Message =
+        new PubsubMessage(
+            bytes,
+            ImmutableMap.of("aaa", "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString));
+    PAssert.that(
+            "expected: raw bytes, attributes: embedded attributes and ParDo generated timestamp",
+            a1t0p1bu0.apply(transformWithoutSerializer).get(OUTPUT))
+        .containsInAnyOrder(a1t0p1bu0Message);
+
+    PCollection<Row> a0t1p1bu0 =
+        pipeline
+            .apply(Create.of(merge(withEmbeddedTimestamp, withPayloadBytes)))
+            .setRowSchema(merge(withTimestampSchema, withPayloadBytesSchema));
+    PubsubMessage a0t1p1bu0Message =
+        new PubsubMessage(
+            bytes, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, embeddedTimestampString));
+    PAssert.that(
+            "expected: payload: raw bytes, attributes: timestamp from row input",
+            a0t1p1bu0.apply(transformWithoutSerializer).get(OUTPUT))
+        .containsInAnyOrder(a0t1p1bu0Message);
+
+    PCollection<Row> a1t1p1bu0 =
+        pipeline
+            .apply(Create.of(merge(merge(withAttributes, withEmbeddedTimestamp), withPayloadBytes)))
+            .setRowSchema(
+                merge(merge(withAttributesSchema, withTimestampSchema), withPayloadBytesSchema));
+    PubsubMessage a1t1p1bu0Message =
+        new PubsubMessage(
+            bytes,
+            ImmutableMap.of(
+                "aaa", "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, embeddedTimestampString));
+    PAssert.that(
+            "expected: payload: raw bytes, attributes: from row input including its embedded timestamp",
+            a1t1p1bu0.apply(transformWithoutSerializer).get(OUTPUT))
+        .containsInAnyOrder(a1t1p1bu0Message);
+
+    PCollection<Row> a1t1p1ru0 =
+        pipeline
+            .apply(Create.of(merge(merge(withAttributes, withEmbeddedTimestamp), withPayloadRow)))
+            .setRowSchema(
+                merge(merge(withAttributesSchema, withTimestampSchema), withPayloadRowSchema));
+    PubsubMessage a1t1p1ru0Message =
+        new PubsubMessage(
+            serializedWithAllDataTypes,
+            ImmutableMap.of(
+                "aaa", "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, embeddedTimestampString));
+    PAssert.that(
+            "expected: payload: serialized row payload input, attributes: from row input including its embedded timestamp",
+            a1t1p1ru0.apply(transformWithSerializer).get(OUTPUT))
+        .containsInAnyOrder(a1t1p1ru0Message);
+
+    PCollection<Row> a1t1p0u1 =
+        pipeline
+            .apply(Create.of(merge(merge(withAttributes, withEmbeddedTimestamp), withAllDataTypes)))
+            .setRowSchema(
+                merge(merge(withAttributesSchema, withTimestampSchema), ALL_DATA_TYPES_SCHEMA));
+    PubsubMessage a1t1p0u1Message =
+        new PubsubMessage(
+            serializedWithAllDataTypes,
+            ImmutableMap.of(
+                "aaa", "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, embeddedTimestampString));
+    PAssert.that(
+            "expected: payload: serialized user fields, attributes: from row input including embedded timestamp",
+            a1t1p0u1.apply(transformWithSerializer).get(OUTPUT))
+        .containsInAnyOrder(a1t1p0u1Message);
+
+    pipeline.run(PIPELINE_OPTIONS);
+  }
+
+  @Test
+  public void testKeyPrefix() {
+    PubsubRowToMessage withDefaultKeyPrefix = PubsubRowToMessage.builder().build();
+    assertEquals(DEFAULT_ATTRIBUTES_KEY_NAME, withDefaultKeyPrefix.getAttributesKeyName());
+    assertEquals(
+        DEFAULT_EVENT_TIMESTAMP_KEY_NAME, withDefaultKeyPrefix.getSourceEventTimestampKeyName());
+    assertEquals(DEFAULT_PAYLOAD_KEY_NAME, withDefaultKeyPrefix.getPayloadKeyName());
+
+    PubsubRowToMessage withKeyPrefixOverride =
+        PubsubRowToMessage.builder().setKeyPrefix("_").build();
+    assertEquals("_" + ATTRIBUTES_KEY_NAME, withKeyPrefixOverride.getAttributesKeyName());
+    assertEquals(
+        "_" + EVENT_TIMESTAMP_KEY_NAME, withKeyPrefixOverride.getSourceEventTimestampKeyName());
+    assertEquals("_" + PAYLOAD_KEY_NAME, withKeyPrefixOverride.getPayloadKeyName());
+  }
+
+  @Test
+  public void testSetTargetTimestampAttributeName() {
+    Instant mockTimestamp = Instant.now();
+    String mockTimestampString = mockTimestamp.toString();
+    byte[] bytes = new byte[] {1, 2, 3, 4};
+
+    Field payloadBytesField = Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES);
+
+    Schema withPayloadBytesSchema = Schema.of(payloadBytesField);
+    Row withPayloadBytes = Row.withSchema(withPayloadBytesSchema).attachValues(bytes);
+
+    String customTargetTimestampAttributeName = "custom_timestamp_key";
+
+    PubsubRowToMessage withoutSetTargetTimestampAttributeName =
+        PubsubRowToMessage.builder().setMockInstant(mockTimestamp).build();
+    PubsubRowToMessage withSetTargetTimestampAttributeName =
+        PubsubRowToMessage.builder()
+            .setMockInstant(mockTimestamp)
+            .setTargetTimestampAttributeName(customTargetTimestampAttributeName)
+            .build();
+
+    PCollection<Row> input =
+        pipeline.apply(Create.of(withPayloadBytes)).setRowSchema(Schema.of(payloadBytesField));
+
+    PAssert.that(input.apply(withoutSetTargetTimestampAttributeName).get(OUTPUT))
+        .containsInAnyOrder(
+            new PubsubMessage(
+                bytes, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString)));
+
+    PAssert.that(input.apply(withSetTargetTimestampAttributeName).get(OUTPUT))
+        .containsInAnyOrder(
+            new PubsubMessage(
+                bytes, ImmutableMap.of(customTargetTimestampAttributeName, mockTimestampString)));
+
+    pipeline.run(PIPELINE_OPTIONS);
+  }
+
+  @Test
+  public void testInputSchemaFactory() {
+    assertEquals(
+        Schema.of(
+            Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+            Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE)),
+        PubsubRowToMessage.builder().build().inputSchemaFactory().buildSchema());
+
+    assertEquals(
+        Schema.of(
+            Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+            Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE),
+            Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES)),
+        PubsubRowToMessage.builder().build().inputSchemaFactory(FieldType.BYTES).buildSchema());
+
+    assertEquals(
+        Schema.of(
+            Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+            Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE),
+            Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA))),
+        PubsubRowToMessage.builder()
+            .build()
+            .inputSchemaFactory(FieldType.row(ALL_DATA_TYPES_SCHEMA))
+            .buildSchema());
+
+    String prefix = "_";
+    assertEquals(
+        Schema.of(
+            Field.of(prefix + ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+            Field.of(prefix + EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE)),
+        PubsubRowToMessage.builder()
+            .setKeyPrefix(prefix)
+            .build()
+            .inputSchemaFactory()
+            .buildSchema());
+
+    Field[] userFields = ALL_DATA_TYPES_SCHEMA.getFields().toArray(new Field[0]);
+    assertEquals(
+        merge(
+            Schema.of(
+                Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+                Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE)),
+            ALL_DATA_TYPES_SCHEMA),
+        PubsubRowToMessage.builder().build().inputSchemaFactory().buildSchema(userFields));
+  }
+
+  @Test
+  public void testValidate() {
+    PubsubRowToMessage pubsubRowToMessage = PubsubRowToMessage.builder().build();
+    PayloadSerializer jsonPayloadSerializer =
+        new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> pubsubRowToMessage.validate(Schema.builder().build()));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            pubsubRowToMessage.validate(
+                Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, FieldType.STRING))));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            pubsubRowToMessage.validate(
+                Schema.of(
+                    Field.of(
+                        DEFAULT_ATTRIBUTES_KEY_NAME,
+                        FieldType.map(FieldType.STRING, FieldType.BYTES)))));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            pubsubRowToMessage.validate(
+                Schema.of(
+                    Field.of(
+                        DEFAULT_ATTRIBUTES_KEY_NAME,
+                        FieldType.map(FieldType.BYTES, FieldType.STRING)))));
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            pubsubRowToMessage.validate(
+                Schema.of(Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, FieldType.STRING))));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> PubsubRowToMessage.builder().build().validate(ALL_DATA_TYPES_SCHEMA));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> PubsubRowToMessage.builder().build().validate(NON_USER_WITH_ROW_PAYLOAD));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            PubsubRowToMessage.builder()
+                .setPayloadSerializer(jsonPayloadSerializer)
+                .build()
+                .validate(NON_USER_WITH_BYTES_PAYLOAD));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            PubsubRowToMessage.builder()
+                .build()
+                .validate(merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_BYTES_PAYLOAD)));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            PubsubRowToMessage.builder()
+                .build()
+                .validate(merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_ROW_PAYLOAD)));
+  }
+
+  @Test
+  public void testValidateAttributesField() {
+    PubsubRowToMessage pubsubRowToMessage = PubsubRowToMessage.builder().build();
+    pubsubRowToMessage.validateAttributesField(ALL_DATA_TYPES_SCHEMA);
+    pubsubRowToMessage.validateAttributesField(
+        Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE)));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            pubsubRowToMessage.validateAttributesField(
+                Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, FieldType.STRING))));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            pubsubRowToMessage.validateAttributesField(
+                Schema.of(
+                    Field.of(
+                        DEFAULT_ATTRIBUTES_KEY_NAME,
+                        FieldType.map(FieldType.STRING, FieldType.BYTES)))));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            pubsubRowToMessage.validateAttributesField(
+                Schema.of(
+                    Field.of(
+                        DEFAULT_ATTRIBUTES_KEY_NAME,
+                        FieldType.map(FieldType.BYTES, FieldType.STRING)))));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            PubsubRowToMessage.builder().build().validateSerializableFields(ALL_DATA_TYPES_SCHEMA));
+  }
+
+  @Test
+  public void testValidateSourceEventTimeStampField() {
+    PubsubRowToMessage pubsubRowToMessage = PubsubRowToMessage.builder().build();
+    pubsubRowToMessage.validateSourceEventTimeStampField(ALL_DATA_TYPES_SCHEMA);
+    pubsubRowToMessage.validateSourceEventTimeStampField(
+        Schema.of(Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE)));
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            pubsubRowToMessage.validateSourceEventTimeStampField(
+                Schema.of(Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, FieldType.STRING))));
+  }
+
+  @Test
+  public void testShouldValidatePayloadSerializerNullState() {
+    PayloadSerializer jsonPayloadSerializer =
+        new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+    PayloadSerializer avroPayloadSerializer =
+        new AvroPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+    PubsubRowToMessage.builder()
+        .setPayloadSerializer(jsonPayloadSerializer)
+        .build()
+        .validateSerializableFields(ALL_DATA_TYPES_SCHEMA);
+
+    PubsubRowToMessage.builder()
+        .setPayloadSerializer(avroPayloadSerializer)
+        .build()
+        .validateSerializableFields(ALL_DATA_TYPES_SCHEMA);
+
+    PubsubRowToMessage.builder()
+        .setPayloadSerializer(jsonPayloadSerializer)
+        .build()
+        .validateSerializableFields(NON_USER_WITH_ROW_PAYLOAD);
+
+    PubsubRowToMessage.builder()
+        .setPayloadSerializer(avroPayloadSerializer)
+        .build()
+        .validateSerializableFields(NON_USER_WITH_ROW_PAYLOAD);
+
+    PubsubRowToMessage.builder().build().validateSerializableFields(NON_USER_WITH_BYTES_PAYLOAD);
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            PubsubRowToMessage.builder().build().validateSerializableFields(ALL_DATA_TYPES_SCHEMA));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            PubsubRowToMessage.builder()
+                .build()
+                .validateSerializableFields(NON_USER_WITH_ROW_PAYLOAD));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            PubsubRowToMessage.builder()
+                .setPayloadSerializer(jsonPayloadSerializer)
+                .build()
+                .validateSerializableFields(NON_USER_WITH_BYTES_PAYLOAD));
+  }
+
+  @Test
+  public void testShouldEitherHavePayloadOrUserFields() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            PubsubRowToMessage.builder()
+                .build()
+                .validateSerializableFields(
+                    Schema.of(
+                        Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE))));
+  }
+
+  @Test
+  public void testShouldNotAllowPayloadFieldWithUserFields() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            PubsubRowToMessage.builder()
+                .build()
+                .validateSerializableFields(
+                    merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_BYTES_PAYLOAD)));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            PubsubRowToMessage.builder()
+                .build()
+                .validateSerializableFields(
+                    merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_ROW_PAYLOAD)));
+  }
+
+  @Test
+  public void testSchemaReflection_matchesAll() {
+    SchemaReflection schemaReflection = SchemaReflection.of(ALL_DATA_TYPES_SCHEMA);
+
+    assertTrue(
+        schemaReflection.matchesAll(
+            FieldMatcher.of(BOOLEAN_FIELD.getName()),
+            FieldMatcher.of(BYTE_FIELD.getName()),
+            FieldMatcher.of(DATETIME_FIELD.getName()),
+            FieldMatcher.of(DECIMAL_FIELD.getName()),
+            FieldMatcher.of(DOUBLE_FIELD.getName()),
+            FieldMatcher.of(FLOAT_FIELD.getName()),
+            FieldMatcher.of(INT16_FIELD.getName()),
+            FieldMatcher.of(INT32_FIELD.getName()),
+            FieldMatcher.of(INT64_FIELD.getName()),
+            FieldMatcher.of(STRING_FIELD.getName())));
+
+    assertTrue(
+        schemaReflection.matchesAll(
+            FieldMatcher.of(BOOLEAN_FIELD.getName(), FieldType.BOOLEAN),
+            FieldMatcher.of(BYTE_FIELD.getName(), FieldType.BYTE),
+            FieldMatcher.of(DATETIME_FIELD.getName(), FieldType.DATETIME),
+            FieldMatcher.of(DECIMAL_FIELD.getName(), FieldType.DECIMAL),
+            FieldMatcher.of(DOUBLE_FIELD.getName(), FieldType.DOUBLE),
+            FieldMatcher.of(FLOAT_FIELD.getName(), FieldType.FLOAT),
+            FieldMatcher.of(INT16_FIELD.getName(), FieldType.INT16),
+            FieldMatcher.of(INT32_FIELD.getName(), FieldType.INT32),
+            FieldMatcher.of(INT64_FIELD.getName(), FieldType.INT64),
+            FieldMatcher.of(STRING_FIELD.getName(), FieldType.STRING)));
+
+    assertTrue(
+        schemaReflection.matchesAll(
+            FieldMatcher.of(BOOLEAN_FIELD.getName(), TypeName.BOOLEAN),
+            FieldMatcher.of(BYTE_FIELD.getName(), TypeName.BYTE),
+            FieldMatcher.of(DATETIME_FIELD.getName(), TypeName.DATETIME),
+            FieldMatcher.of(DECIMAL_FIELD.getName(), TypeName.DECIMAL),
+            FieldMatcher.of(DOUBLE_FIELD.getName(), TypeName.DOUBLE),
+            FieldMatcher.of(FLOAT_FIELD.getName(), TypeName.FLOAT),
+            FieldMatcher.of(INT16_FIELD.getName(), TypeName.INT16),
+            FieldMatcher.of(INT32_FIELD.getName(), TypeName.INT32),
+            FieldMatcher.of(INT64_FIELD.getName(), TypeName.INT64),
+            FieldMatcher.of(STRING_FIELD.getName(), TypeName.STRING)));
+
+    assertFalse(
+        schemaReflection.matchesAll(
+            FieldMatcher.of("idontexist"),
+            FieldMatcher.of(BOOLEAN_FIELD.getName()),
+            FieldMatcher.of(BYTE_FIELD.getName()),
+            FieldMatcher.of(DATETIME_FIELD.getName()),
+            FieldMatcher.of(DECIMAL_FIELD.getName()),
+            FieldMatcher.of(DOUBLE_FIELD.getName()),
+            FieldMatcher.of(FLOAT_FIELD.getName()),
+            FieldMatcher.of(INT16_FIELD.getName()),
+            FieldMatcher.of(INT32_FIELD.getName()),
+            FieldMatcher.of(INT64_FIELD.getName()),
+            FieldMatcher.of(STRING_FIELD.getName())));
+
+    assertFalse(
+        schemaReflection.matchesAll(
+            FieldMatcher.of(BOOLEAN_FIELD.getName(), FieldType.BOOLEAN),
+            FieldMatcher.of(BYTE_FIELD.getName(), FieldType.BYTE),
+            FieldMatcher.of(DATETIME_FIELD.getName(), FieldType.DATETIME),
+            FieldMatcher.of(DECIMAL_FIELD.getName(), FieldType.DECIMAL),
+            FieldMatcher.of(DOUBLE_FIELD.getName(), FieldType.DOUBLE),
+            FieldMatcher.of(FLOAT_FIELD.getName(), FieldType.FLOAT),
+            FieldMatcher.of(INT16_FIELD.getName(), FieldType.INT16),
+            FieldMatcher.of(INT32_FIELD.getName(), FieldType.INT32),
+            FieldMatcher.of(INT64_FIELD.getName(), FieldType.INT64),
+            // should not match type:
+            FieldMatcher.of(STRING_FIELD.getName(), FieldType.BYTE)));
+
+    assertFalse(
+        schemaReflection.matchesAll(
+            FieldMatcher.of(BOOLEAN_FIELD.getName(), TypeName.BOOLEAN),
+            FieldMatcher.of(BYTE_FIELD.getName(), TypeName.BYTE),
+            FieldMatcher.of(DATETIME_FIELD.getName(), TypeName.DATETIME),
+            FieldMatcher.of(DECIMAL_FIELD.getName(), TypeName.DECIMAL),
+            FieldMatcher.of(DOUBLE_FIELD.getName(), TypeName.DOUBLE),
+            FieldMatcher.of(FLOAT_FIELD.getName(), TypeName.FLOAT),
+            FieldMatcher.of(INT16_FIELD.getName(), TypeName.INT16),
+            FieldMatcher.of(INT32_FIELD.getName(), TypeName.INT32),
+            FieldMatcher.of(INT64_FIELD.getName(), TypeName.INT64),
+            // should not match TypeName:
+            FieldMatcher.of(STRING_FIELD.getName(), TypeName.INT16)));
+  }
+
+  @Test
+  public void testFieldMatcher_match_NameOnly() {
+    FieldMatcher fieldMatcher = FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME);
+    assertTrue(fieldMatcher.match(NON_USER_WITH_ROW_PAYLOAD));
+    assertTrue(fieldMatcher.match(NON_USER_WITH_BYTES_PAYLOAD));
+    assertFalse(fieldMatcher.match(ALL_DATA_TYPES_SCHEMA));
+  }
+
+  @Test
+  public void testFieldMatcher_match_TypeName() {
+    assertTrue(
+        FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.ROW).match(NON_USER_WITH_ROW_PAYLOAD));
+    assertTrue(
+        FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.BYTES)
+            .match(NON_USER_WITH_BYTES_PAYLOAD));
+    assertFalse(
+        FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.ROW).match(NON_USER_WITH_BYTES_PAYLOAD));
+    assertFalse(
+        FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.BYTES).match(NON_USER_WITH_ROW_PAYLOAD));
+  }
+
+  @Test
+  public void testFieldMatcher_match_FieldType() {
+    assertTrue(
+        FieldMatcher.of(
+                DEFAULT_ATTRIBUTES_KEY_NAME, FieldType.map(FieldType.STRING, FieldType.STRING))
+            .match(NON_USER_WITH_BYTES_PAYLOAD));
+    assertFalse(
+        FieldMatcher.of(
+                DEFAULT_ATTRIBUTES_KEY_NAME, FieldType.map(FieldType.STRING, FieldType.BYTES))
+            .match(NON_USER_WITH_BYTES_PAYLOAD));
+  }
+
+  @Test
+  public void testRemoveFields() {
+    Schema input = Schema.of(STRING_FIELD, BOOLEAN_FIELD, INT64_FIELD);
+    Schema expected = Schema.builder().build();
+    Schema actual =
+        removeFields(input, STRING_FIELD.getName(), BOOLEAN_FIELD.getName(), INT64_FIELD.getName());
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testPubsubRowToMessageParDo_attributesWithoutTimestamp() {
+    Map<String, String> attributes = new HashMap<>();
+    attributes.put("a", "1");
+    attributes.put("b", "2");
+    attributes.put("c", "3");
+
+    Row withAttributes =
+        Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD)
+            .addValues(attributes, Instant.now(), new byte[] {})
+            .build();
+
+    assertEquals(
+        attributes,
+        doFn(NON_USER_WITH_BYTES_PAYLOAD, null).attributesWithoutTimestamp(withAttributes));
+
+    Schema withoutAttributesSchema =
+        removeFields(NON_USER_WITH_BYTES_PAYLOAD, DEFAULT_ATTRIBUTES_KEY_NAME);
+    Row withoutAttributes =
+        Row.withSchema(withoutAttributesSchema).addValues(Instant.now(), new byte[] {}).build();
+
+    assertEquals(
+        ImmutableMap.of(),
+        doFn(withoutAttributesSchema, null).attributesWithoutTimestamp(withoutAttributes));
+  }
+
+  @Test
+  public void testPubsubRowToMessageParDo_timestamp() {
+    Instant timestamp = Instant.now();
+    Row withTimestamp =
+        Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD)
+            .addValues(ImmutableMap.of(), timestamp, new byte[] {})
+            .build();
+    assertEquals(
+        timestamp.toString(),
+        doFn(NON_USER_WITH_BYTES_PAYLOAD, null).timestamp(withTimestamp).toString());
+
+    Schema withoutTimestampSchema =
+        removeFields(NON_USER_WITH_BYTES_PAYLOAD, DEFAULT_EVENT_TIMESTAMP_KEY_NAME);
+    Row withoutTimestamp =
+        Row.withSchema(withoutTimestampSchema).addValues(ImmutableMap.of(), new byte[] {}).build();
+    ReadableDateTime actual = doFn(withoutTimestampSchema, null).timestamp(withoutTimestamp);
+    assertNotNull(actual);
+  }
+
+  @Test
+  public void testPubsubRowToMessageParDo_payload() {
+    PayloadSerializer jsonPayloadSerializer =
+        new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+    PayloadSerializer avroPayloadSerializer =
+        new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+    byte[] bytes = "abcdefg".getBytes(StandardCharsets.UTF_8);
+    assertArrayEquals(
+        bytes,
+        doFn(NON_USER_WITH_BYTES_PAYLOAD, null)
+            .payload(
+                Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD)
+                    .addValues(ImmutableMap.of(), Instant.now(), bytes)
+                    .build()));
+
+    Row withoutUserFields =
+        Row.withSchema(NON_USER_WITH_ROW_PAYLOAD)
+            .addValues(
+                ImmutableMap.of(),
+                Instant.now(),
+                rowWithAllDataTypes(
+                    true,
+                    (byte) 1,
+                    Instant.now().toDateTime(),
+                    BigDecimal.valueOf(100L),
+                    1.12345,
+                    1.1f,
+                    (short) 1,
+                    1,
+                    1L,
+                    "abcdefg"))
+            .build();
+
+    assertArrayEquals(
+        jsonPayloadSerializer.serialize(withoutUserFields.getRow(DEFAULT_PAYLOAD_KEY_NAME)),
+        doFn(NON_USER_WITH_ROW_PAYLOAD, jsonPayloadSerializer).payload(withoutUserFields));
+
+    assertArrayEquals(
+        avroPayloadSerializer.serialize(withoutUserFields.getRow(DEFAULT_PAYLOAD_KEY_NAME)),
+        doFn(NON_USER_WITH_ROW_PAYLOAD, avroPayloadSerializer).payload(withoutUserFields));
+
+    Map<String, String> attributes = new HashMap<>();
+    attributes.put("a", "1");
+    attributes.put("b", "2");
+    attributes.put("c", "3");
+
+    Row withAttributesAndTimestamp =
+        Row.withSchema(NON_USER_WITHOUT_PAYLOAD).addValues(attributes, Instant.now()).build();
+
+    Row withUserFields =
+        rowWithAllDataTypes(
+            false,
+            (byte) 0,
+            Instant.now().toDateTime(),
+            BigDecimal.valueOf(200L),
+            2.12345,
+            2.1f,
+            (short) 2,
+            2,
+            2L,
+            "1234567");
+
+    Row withUserFieldsAndAttributesTimestamp = merge(withAttributesAndTimestamp, withUserFields);
+
+    assertArrayEquals(
+        jsonPayloadSerializer.serialize(withUserFields),
+        doFn(withUserFieldsAndAttributesTimestamp.getSchema(), jsonPayloadSerializer)
+            .payload(withUserFieldsAndAttributesTimestamp));
+
+    assertArrayEquals(
+        avroPayloadSerializer.serialize(withUserFields),
+        doFn(withUserFieldsAndAttributesTimestamp.getSchema(), avroPayloadSerializer)
+            .payload(withUserFieldsAndAttributesTimestamp));
+  }
+
+  @Test
+  public void testPubsubRowToMessageParDo_serializableRow() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            doFn(NON_USER_WITH_BYTES_PAYLOAD, null)
+                .serializableRow(
+                    Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD)
+                        .attachValues(ImmutableMap.of(), Instant.now(), new byte[] {})));
+
+    Map<String, String> attributes = new HashMap<>();
+    attributes.put("a", "1");
+    attributes.put("b", "2");
+    attributes.put("c", "3");
+
+    Row withAllDataTypes =
+        rowWithAllDataTypes(
+            true,
+            (byte) 0,
+            Instant.now().toDateTime(),
+            BigDecimal.valueOf(1L),
+            3.12345,
+            4.1f,
+            (short) 5,
+            2,
+            7L,
+            "asdfjkl;");
+
+    Row nonUserWithRowPayload =
+        Row.withSchema(NON_USER_WITH_ROW_PAYLOAD)
+            .attachValues(attributes, Instant.now(), withAllDataTypes);
+
+    PayloadSerializer jsonPayloadSerializer =
+        new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+    assertEquals(
+        withAllDataTypes,
+        doFn(NON_USER_WITH_ROW_PAYLOAD, jsonPayloadSerializer)
+            .serializableRow(nonUserWithRowPayload));
+
+    Row withAttributesAndTimestamp =
+        Row.withSchema(NON_USER_WITHOUT_PAYLOAD).addValues(attributes, Instant.now()).build();
+
+    Row withUserFieldsAttributesAndTimestamp = merge(withAttributesAndTimestamp, withAllDataTypes);
+
+    assertEquals(
+        withAllDataTypes,
+        doFn(withUserFieldsAttributesAndTimestamp.getSchema(), jsonPayloadSerializer)
+            .serializableRow(withUserFieldsAttributesAndTimestamp));
+  }
+
+  private static PubsubRowToMessageDoFn doFn(Schema schema, PayloadSerializer payloadSerializer) {
+    return new PubsubRowToMessageDoFn(
+        DEFAULT_ATTRIBUTES_KEY_NAME,
+        DEFAULT_EVENT_TIMESTAMP_KEY_NAME,
+        DEFAULT_PAYLOAD_KEY_NAME,
+        errorSchema(schema),
+        DEFAULT_EVENT_TIMESTAMP_KEY_NAME,
+        null,
+        payloadSerializer);
+  }
+
+  private static Row rowWithAllDataTypes(
+      boolean boolean0,
+      byte byte0,
+      ReadableDateTime datetime,
+      BigDecimal decimal,
+      Double double0,
+      Float float0,
+      Short int16,
+      Integer int32,
+      Long int64,
+      String string) {
+    return Row.withSchema(ALL_DATA_TYPES_SCHEMA)
+        .addValues(boolean0, byte0, datetime, decimal, double0, float0, int16, int32, int64, string)
+        .build();
+  }
+
+  private static Schema merge(Schema a, Schema b) {
+    List<Field> fields = new ArrayList<>(a.getFields());
+    fields.addAll(b.getFields());
+    Schema.Builder builder = Schema.builder();
+    for (Field field : fields) {
+      builder = builder.addField(field);
+    }
+    return builder.build();
+  }
+
+  private static Row merge(Row a, Row b) {
+    Schema schema = merge(a.getSchema(), b.getSchema());
+    Map<String, Object> values = new HashMap<>();
+    for (String name : a.getSchema().getFieldNames()) {
+      values.put(name, a.getValue(name));
+    }
+    for (String name : b.getSchema().getFieldNames()) {
+      values.put(name, b.getValue(name));
+    }
+    return Row.withSchema(schema).withFieldValues(values).build();
+  }
+}