You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/11/11 19:12:18 UTC
[beam] branch master updated: Implement PubsubRowToMessage transform (#23897)
This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9fcd20c3712 Implement PubsubRowToMessage transform (#23897)
9fcd20c3712 is described below
commit 9fcd20c3712536f2d4580beead678cdbb6fd4746
Author: Damon <da...@users.noreply.github.com>
AuthorDate: Fri Nov 11 11:12:11 2022 -0800
Implement PubsubRowToMessage transform (#23897)
* Begin PubsubRowToMessage Impl
* Complete working draft
* Unit tests validate user and non-user fields
* Finish tests on supporting methods
* Pass checks before finalizing tests
* WIP
* fix timestamp
* finalize tests
* Finalize code comments
* Clean up check findings
* Add InputSchemaFactory
* Patch code comment typo
---
.../beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java | 577 +++++++++++
.../sdk/io/gcp/pubsub/PubsubRowToMessageTest.java | 1085 ++++++++++++++++++++
2 files changed, 1662 insertions(+)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java
new file mode 100644
index 00000000000..9d7bbcf67c4
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDateTime;
+
+/** Write side {@link Row} to {@link PubsubMessage} converter. */
+@Internal
+@Experimental
+@AutoValue
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+abstract class PubsubRowToMessage extends PTransform<PCollection<Row>, PCollectionTuple> {
+
+ static Builder builder() {
+ return new AutoValue_PubsubRowToMessage.Builder();
+ }
+
+ static final TupleTag<PubsubMessage> OUTPUT = new TupleTag<PubsubMessage>() {};
+ static final TupleTag<Row> ERROR = new TupleTag<Row>() {};
+ static final String ERROR_DATA_FIELD_NAME = "data";
+ static final Field ERROR_MESSAGE_FIELD = Field.of("error_message", FieldType.STRING);
+ static final Field ERROR_STACK_TRACE_FIELD = Field.of("error_stack_trace", FieldType.STRING);
+
+ static final String DEFAULT_KEY_PREFIX = "$";
+ static final String ATTRIBUTES_KEY_NAME = "pubsub_attributes";
+ static final FieldType ATTRIBUTES_FIELD_TYPE = FieldType.map(FieldType.STRING, FieldType.STRING);
+
+ static final String EVENT_TIMESTAMP_KEY_NAME = "pubsub_event_timestamp";
+ static final FieldType EVENT_TIMESTAMP_FIELD_TYPE = FieldType.DATETIME;
+
+ static final String PAYLOAD_KEY_NAME = "pubsub_payload";
+ static final TypeName PAYLOAD_BYTES_TYPE_NAME = TypeName.BYTES;
+ static final TypeName PAYLOAD_ROW_TYPE_NAME = TypeName.ROW;
+
+ /** The prefix for all non-user fields. Defaults to {@link #DEFAULT_KEY_PREFIX}. */
+ abstract String getKeyPrefix();
+
+ /**
+ * The {@link PayloadSerializer} to {@link PayloadSerializer#serialize(Row)} the payload or user
+ * fields row.
+ */
+ @Nullable
+ abstract PayloadSerializer getPayloadSerializer();
+
+ /** The name of the attribute to apply to the {@link PubsubMessage}. */
+ @Nullable
+ abstract String getTargetTimestampAttributeName();
+
+ /**
+ * Use for testing, simplify assertions of generated timestamp when input lacks a timestamp field.
+ */
+ @Nullable
+ abstract Instant getMockInstant();
+
+ /** Generates {@link Schema} of the {@link #ERROR} {@link PCollection}. */
+ static Schema errorSchema(Schema inputSchema) {
+ Field dataField = Field.of(ERROR_DATA_FIELD_NAME, FieldType.row(inputSchema));
+ return Schema.of(dataField, ERROR_MESSAGE_FIELD, ERROR_STACK_TRACE_FIELD);
+ }
+
+ /**
+ * As a convenience method, generates {@link InputSchemaFactory} for expected {@link Schema} for
+ * {@link Row} input into {@link PubsubRowToMessage}, excluding {@link Field} for {@link
+ * #getPayloadKeyName()}. See {@link InputSchemaFactory#buildSchema(Field...)} for details on how
+ * to add additional fields.
+ */
+ InputSchemaFactory inputSchemaFactory() {
+ return inputSchemaFactory(null);
+ }
+
+ /**
+ * As a convenience method, generates {@link InputSchemaFactory} for expected {@link Schema} for
+ * {@link Row} input into {@link PubsubRowToMessage}. The {@link Field} for {@link
+ * #getPayloadKeyName()} is excluded for null {@param payloadFieldType}. See {@link
+ * InputSchemaFactory#buildSchema(Field...)} for details on how to add additional fields.
+ */
+ InputSchemaFactory inputSchemaFactory(@Nullable FieldType payloadFieldType) {
+ InputSchemaFactory.Builder builder =
+ InputSchemaFactory.builder()
+ .setAttributesField(Field.of(getAttributesKeyName(), ATTRIBUTES_FIELD_TYPE))
+ .setTimestampField(
+ Field.of(getSourceEventTimestampKeyName(), EVENT_TIMESTAMP_FIELD_TYPE));
+
+ if (payloadFieldType != null) {
+ builder = builder.setPayloadField(Field.of(getPayloadKeyName(), payloadFieldType));
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public PCollectionTuple expand(PCollection<Row> input) {
+ Schema schema = input.getSchema();
+ validate(schema);
+ Schema errorSchema = errorSchema(schema);
+ PCollectionTuple pct =
+ input.apply(
+ PubsubRowToMessage.class.getSimpleName(),
+ ParDo.of(
+ new PubsubRowToMessageDoFn(
+ getAttributesKeyName(),
+ getSourceEventTimestampKeyName(),
+ getPayloadKeyName(),
+ errorSchema,
+ getTargetTimestampAttributeName(),
+ getMockInstant(),
+ getPayloadSerializer()))
+ .withOutputTags(OUTPUT, TupleTagList.of(ERROR)));
+
+ PCollection<PubsubMessage> output = pct.get(OUTPUT);
+ PCollection<Row> error = pct.get(ERROR).setRowSchema(errorSchema);
+ return PCollectionTuple.of(OUTPUT, output).and(ERROR, error);
+ }
+
+ /** Returns the name of the source attributes key, prefixed with {@link #getKeyPrefix()}. */
+ String getAttributesKeyName() {
+ return getKeyPrefix() + ATTRIBUTES_KEY_NAME;
+ }
+
+ /** Returns the name of the source timestamp key, prefixed with {@link #getKeyPrefix()}. */
+ String getSourceEventTimestampKeyName() {
+ return getKeyPrefix() + EVENT_TIMESTAMP_KEY_NAME;
+ }
+
+ /** Returns the name of the source payload key, prefixed with {@link #getKeyPrefix()}. */
+ String getPayloadKeyName() {
+ return getKeyPrefix() + PAYLOAD_KEY_NAME;
+ }
+
+ /** Validates an input's {@link Schema} for correctness. */
+ void validate(Schema schema) {
+
+ if (schema.getFieldCount() == 0) {
+ throw new IllegalArgumentException(
+ String.format("Schema must contain at least one field. Schema: %s", schema));
+ }
+
+ validateAttributesField(schema);
+ validateSourceEventTimeStampField(schema);
+ validateSerializableFields(schema);
+ }
+
+ /**
+ * Validates an input's {@link Schema} for its {@link #getAttributesKeyName()} field correctness,
+ * if exists.
+ */
+ void validateAttributesField(Schema schema) {
+ String attributesKeyName = getAttributesKeyName();
+ if (!schema.hasField(attributesKeyName)) {
+ return;
+ }
+ checkArgument(
+ SchemaReflection.of(schema)
+ .matchesAll(FieldMatcher.of(attributesKeyName, ATTRIBUTES_FIELD_TYPE)));
+ }
+
+ /**
+ * Validates an input's {@link Schema} for its {@link #getSourceEventTimestampKeyName()} ()} field
+ * correctness, if exists.
+ */
+ void validateSourceEventTimeStampField(Schema schema) {
+ String eventTimestampKeyName = getSourceEventTimestampKeyName();
+ if (!schema.hasField(eventTimestampKeyName)) {
+ return;
+ }
+ checkArgument(
+ SchemaReflection.of(schema)
+ .matchesAll(FieldMatcher.of(eventTimestampKeyName, EVENT_TIMESTAMP_FIELD_TYPE)));
+ }
+
+ /**
+ * Validates an input's {@link Schema} for either {@link #getPayloadKeyName()} or user fields
+ * correctness. Additionally, it validates the {@link #getPayloadSerializer()} null state based on
+ * the {@link #getPayloadKeyName()} {@link FieldType} or the presence of user fields.
+ */
+ void validateSerializableFields(Schema schema) {
+ String attributesKeyName = getAttributesKeyName();
+ String eventTimestampKeyName = getSourceEventTimestampKeyName();
+ String payloadKeyName = getPayloadKeyName();
+ Schema withUserFieldsOnly =
+ removeFields(schema, attributesKeyName, eventTimestampKeyName, payloadKeyName);
+ boolean hasUserFields = withUserFieldsOnly.getFieldCount() > 0;
+ String withUserFieldsList = String.join(", ", withUserFieldsOnly.getFieldNames());
+ SchemaReflection schemaReflection = SchemaReflection.of(schema);
+ boolean hasPayloadField = schemaReflection.matchesAll(FieldMatcher.of(payloadKeyName));
+ boolean hasPayloadRowField =
+ schemaReflection.matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_ROW_TYPE_NAME));
+ boolean hasPayloadBytesField =
+ schemaReflection.matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_BYTES_TYPE_NAME));
+ boolean hasBothUserFieldsAndPayloadField = hasUserFields && hasPayloadField;
+
+ checkArgument(
+ hasUserFields || hasPayloadField,
+ String.format(
+ "schema must have either a %s field or user fields i.e. not %s, %s or %s",
+ payloadKeyName, attributesKeyName, eventTimestampKeyName, payloadKeyName));
+
+ checkArgument(
+ !hasBothUserFieldsAndPayloadField,
+ String.format(
+ "schema field: %s incompatible with %s fields", payloadKeyName, withUserFieldsList));
+
+ if (hasPayloadBytesField) {
+ checkArgument(
+ getPayloadSerializer() == null,
+ String.format(
+ "schema field: %s of type: %s with a %s is incompatible",
+ payloadKeyName, PAYLOAD_BYTES_TYPE_NAME, PayloadSerializer.class.getName()));
+ }
+
+ if (hasPayloadRowField) {
+ checkArgument(
+ getPayloadSerializer() != null,
+ String.format(
+ "schema field: %s of type: %s requires a %s",
+ payloadKeyName, PAYLOAD_ROW_TYPE_NAME, PayloadSerializer.class.getName()));
+ }
+
+ if (hasUserFields) {
+ checkArgument(
+ getPayloadSerializer() != null,
+ String.format(
+ "specifying schema fields: %s requires a %s",
+ withUserFieldsList, PayloadSerializer.class.getName()));
+ }
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setKeyPrefix(String value);
+
+ abstract Optional<String> getKeyPrefix();
+
+ abstract Builder setPayloadSerializer(PayloadSerializer value);
+
+ abstract Builder setTargetTimestampAttributeName(String value);
+
+ abstract Builder setMockInstant(Instant value);
+
+ abstract PubsubRowToMessage autoBuild();
+
+ final PubsubRowToMessage build() {
+ if (!getKeyPrefix().isPresent()) {
+ setKeyPrefix(DEFAULT_KEY_PREFIX);
+ }
+ return autoBuild();
+ }
+ }
+
+ /** {@link SchemaReflection} is a helper class for reflecting fields of a {@link Schema}. */
+ static class SchemaReflection {
+ static SchemaReflection of(Schema schema) {
+ return new SchemaReflection(schema);
+ }
+
+ private final Schema schema;
+
+ private SchemaReflection(Schema schema) {
+ this.schema = schema;
+ }
+
+ /** Returns true of all {@param fieldMatchers} {@link FieldMatcher#match(Schema)}. */
+ boolean matchesAll(FieldMatcher... fieldMatchers) {
+ for (FieldMatcher fieldMatcher : fieldMatchers) {
+ if (!fieldMatcher.match(schema)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /** {@link FieldMatcher} matches fields in a {@link Schema}. */
+ static class FieldMatcher {
+ static FieldMatcher of(String name) {
+ return new FieldMatcher(name);
+ }
+
+ static FieldMatcher of(String name, TypeName typeName) {
+ return new FieldMatcher(name, typeName);
+ }
+
+ static FieldMatcher of(String name, FieldType fieldType) {
+ return new FieldMatcher(name, fieldType);
+ }
+
+ private final String name;
+
+ @Nullable private final TypeName typeName;
+
+ @Nullable private final FieldType fieldType;
+
+ private FieldMatcher(String name, @Nullable TypeName typeName, @Nullable FieldType fieldType) {
+ this.name = name;
+ this.typeName = typeName;
+ this.fieldType = fieldType;
+ }
+
+ private FieldMatcher(String name) {
+ this(name, null, null);
+ }
+
+ private FieldMatcher(String name, TypeName typeName) {
+ this(name, typeName, null);
+ }
+
+ private FieldMatcher(String name, FieldType fieldType) {
+ this(name, null, fieldType);
+ }
+
+ /** Returns true when the {@link Field} in a {@link Schema} matches its search criteria. */
+ boolean match(Schema schema) {
+ if (!schema.hasField(name)) {
+ return false;
+ }
+ if (typeName == null && fieldType == null) {
+ return true;
+ }
+ Field field = schema.getField(name);
+ if (typeName != null) {
+ return field.getType().getTypeName().equals(typeName);
+ }
+ return fieldType.equals(field.getType());
+ }
+ }
+
+ /** Removes fields from a {@link Schema}. */
+ static Schema removeFields(Schema schema, String... fields) {
+ List<String> exclude = Arrays.stream(fields).collect(Collectors.toList());
+ Schema.Builder builder = Schema.builder();
+ for (Field field : schema.getFields()) {
+ if (exclude.contains(field.getName())) {
+ continue;
+ }
+ builder.addField(field);
+ }
+ return builder.build();
+ }
+
+ /** A {@link DoFn} that converts a {@link Row} to a {@link PubsubMessage}. */
+ static class PubsubRowToMessageDoFn extends DoFn<Row, PubsubMessage> {
+
+ private final String attributesKeyName;
+ private final String sourceTimestampKeyName;
+ private final String payloadKeyName;
+ private final Schema errorSchema;
+
+ @Nullable private final String targetTimestampKeyName;
+ @Nullable private final PayloadSerializer payloadSerializer;
+
+ @Nullable private final Instant mockInstant;
+
+ PubsubRowToMessageDoFn(
+ String attributesKeyName,
+ String sourceTimestampKeyName,
+ String payloadKeyName,
+ Schema errorSchema,
+ @Nullable String targetTimestampKeyName,
+ @Nullable Instant mockInstant,
+ @Nullable PayloadSerializer payloadSerializer) {
+ this.attributesKeyName = attributesKeyName;
+ this.sourceTimestampKeyName = sourceTimestampKeyName;
+ this.payloadKeyName = payloadKeyName;
+ this.errorSchema = errorSchema;
+ this.targetTimestampKeyName = targetTimestampKeyName;
+ this.payloadSerializer = payloadSerializer;
+ this.mockInstant = mockInstant;
+ }
+
+ @ProcessElement
+ public void process(@Element Row row, MultiOutputReceiver receiver) {
+ try {
+
+ Map<String, String> attributesWithoutTimestamp = this.attributesWithoutTimestamp(row);
+ String timestampAsString = this.timestampAsString(row);
+ String timestampKeyName = sourceTimestampKeyName;
+ if (targetTimestampKeyName != null) {
+ timestampKeyName = targetTimestampKeyName;
+ }
+ byte[] payload = this.payload(row);
+ HashMap<String, String> attributes = new HashMap<>(attributesWithoutTimestamp);
+ attributes.put(timestampKeyName, timestampAsString);
+ PubsubMessage message = new PubsubMessage(payload, attributes);
+ receiver.get(OUTPUT).output(message);
+
+ } catch (Exception e) {
+
+ String message = e.getMessage();
+ String stackTrace = Throwables.getStackTraceAsString(e);
+ Row error =
+ Row.withSchema(errorSchema)
+ .withFieldValue(ERROR_DATA_FIELD_NAME, row)
+ .withFieldValue(ERROR_MESSAGE_FIELD.getName(), message)
+ .withFieldValue(ERROR_STACK_TRACE_FIELD.getName(), stackTrace)
+ .build();
+
+ receiver.get(ERROR).output(error);
+ }
+ }
+
+ /**
+ * Extracts the {@code Map<String, String>} attributes from a {@link Row} that contains the
+ * {@link #attributesKeyName}.
+ */
+ Map<String, String> attributesWithoutTimestamp(Row row) {
+ if (!row.getSchema().hasField(attributesKeyName)) {
+ return new HashMap<>();
+ }
+ return row.getMap(attributesKeyName);
+ }
+
+ /**
+ * Outputs the {@link #timestamp(Row)} as a String in RFC 3339 format. For example, {@code
+ * 2015-10-29T23:41:41.123Z}.
+ */
+ String timestampAsString(Row row) {
+ return timestamp(row).toString();
+ }
+
+ /**
+ * Extracts a {@link ReadableDateTime} from a {@link Row} containing a {@link
+ * #sourceTimestampKeyName}. If the {@link Field} is missing, returns an {@link Instant#now()}.
+ */
+ ReadableDateTime timestamp(Row row) {
+ if (row.getSchema().hasField(sourceTimestampKeyName)) {
+ return row.getDateTime(sourceTimestampKeyName);
+ }
+ Instant instant = Instant.now();
+ if (mockInstant != null) {
+ instant = mockInstant;
+ }
+ return new DateTime(instant).withZone(instant.getZone());
+ }
+
+ /**
+ * Extracts a {@code byte[]} payload from a {@link Row}, from either of the following mutually
+ * exclusive sources. <br>
+ * - {@link #payloadKeyName} {@link Field} with {@link FieldType#BYTES} <br>
+ * - serialized {@link #payloadKeyName} {@link Field} with {@link TypeName#ROW} using the {@link
+ * #payloadSerializer} <br>
+ * - serialized user fields provided that are not {@link #attributesKeyName} and {@link
+ * #sourceTimestampKeyName}
+ */
+ byte[] payload(Row row) {
+ if (SchemaReflection.of(row.getSchema())
+ .matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_BYTES_TYPE_NAME))) {
+ return row.getBytes(payloadKeyName);
+ }
+ return Objects.requireNonNull(payloadSerializer).serialize(serializableRow(row));
+ }
+
+ /**
+ * Extracts the serializable part of a {@link Row} from the following mutually exclusive
+ * sources. <br>
+ * - serialized {@link #payloadKeyName} {@link Field} with {@link TypeName#ROW} using the {@link
+ * #payloadSerializer} <br>
+ * - serialized user fields provided that are not {@link #attributesKeyName} and {@link
+ * #sourceTimestampKeyName}
+ */
+ Row serializableRow(Row row) {
+ SchemaReflection schemaReflection = SchemaReflection.of(row.getSchema());
+
+ if (schemaReflection.matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_BYTES_TYPE_NAME))) {
+ throw new IllegalArgumentException(
+ String.format(
+ "serializable Row does not exist for payload of type: %s",
+ PAYLOAD_BYTES_TYPE_NAME));
+ }
+
+ if (schemaReflection.matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_ROW_TYPE_NAME))) {
+ return row.getRow(payloadKeyName);
+ }
+ Schema withUserFieldsOnly =
+ removeFields(row.getSchema(), attributesKeyName, sourceTimestampKeyName);
+ Map<String, Object> values = new HashMap<>();
+ for (String name : withUserFieldsOnly.getFieldNames()) {
+ values.put(name, row.getValue(name));
+ }
+ return Row.withSchema(withUserFieldsOnly).withFieldValues(values).build();
+ }
+ }
+
+ /**
+ * A convenience class for generating the expected {@link Schema} for {@link Row} input of {@link
+ * PubsubRowToMessage}.
+ */
+ @AutoValue
+ abstract static class InputSchemaFactory {
+ static Builder builder() {
+ return new AutoValue_PubsubRowToMessage_InputSchemaFactory.Builder();
+ }
+
+ abstract Field getAttributesField();
+
+ abstract Field getTimestampField();
+
+ @Nullable
+ abstract Field getPayloadField();
+
+ /**
+ * Builds a {@link Schema} from {@link #getAttributesField()} and {@link #getTimestampField()}
+ * and {@param additionalFields}. Users are encouraged to use the {@link #removeFields(Schema,
+ * String...)} method to customize the resulting {@link Schema}.
+ */
+ Schema buildSchema(Field... additionalFields) {
+ Schema.Builder builder =
+ Schema.builder().addField(getAttributesField()).addField(getTimestampField());
+
+ if (getPayloadField() != null) {
+ builder = builder.addField(getPayloadField());
+ }
+
+ for (Field field : additionalFields) {
+ builder = builder.addField(field);
+ }
+
+ return builder.build();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setAttributesField(Field value);
+
+ abstract Builder setTimestampField(Field value);
+
+ abstract Builder setPayloadField(Field value);
+
+ abstract InputSchemaFactory build();
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessageTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessageTest.java
new file mode 100644
index 00000000000..5ef8bc473e9
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessageTest.java
@@ -0,0 +1,1085 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_KEY_NAME;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_KEY_PREFIX;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ERROR;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_KEY_NAME;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.OUTPUT;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PAYLOAD_KEY_NAME;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.errorSchema;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.removeFields;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.FieldMatcher;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PubsubRowToMessageDoFn;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.SchemaReflection;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.io.payloads.AvroPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDateTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PubsubRowToMessage}. */
+@RunWith(JUnit4.class)
+public class PubsubRowToMessageTest {
+
+ private static final PipelineOptions PIPELINE_OPTIONS = PipelineOptionsFactory.create();
+ private static final String DEFAULT_ATTRIBUTES_KEY_NAME =
+ DEFAULT_KEY_PREFIX + ATTRIBUTES_KEY_NAME;
+ private static final String DEFAULT_EVENT_TIMESTAMP_KEY_NAME =
+ DEFAULT_KEY_PREFIX + EVENT_TIMESTAMP_KEY_NAME;
+ private static final String DEFAULT_PAYLOAD_KEY_NAME = DEFAULT_KEY_PREFIX + PAYLOAD_KEY_NAME;
+ private static final Field BOOLEAN_FIELD = Field.of("boolean", FieldType.BOOLEAN);
+ private static final Field BYTE_FIELD = Field.of("byte", FieldType.BYTE);
+ private static final Field DATETIME_FIELD = Field.of("datetime", FieldType.DATETIME);
+ private static final Field DECIMAL_FIELD = Field.of("decimal", FieldType.DECIMAL);
+ private static final Field DOUBLE_FIELD = Field.of("double", FieldType.DOUBLE);
+ private static final Field FLOAT_FIELD = Field.of("float", FieldType.FLOAT);
+ private static final Field INT16_FIELD = Field.of("int16", FieldType.INT16);
+ private static final Field INT32_FIELD = Field.of("int32", FieldType.INT32);
+ private static final Field INT64_FIELD = Field.of("int64", FieldType.INT64);
+ private static final Field STRING_FIELD = Field.of("string", FieldType.STRING);
+ private static final Schema ALL_DATA_TYPES_SCHEMA =
+ Schema.of(
+ BOOLEAN_FIELD,
+ BYTE_FIELD,
+ DATETIME_FIELD,
+ DECIMAL_FIELD,
+ DOUBLE_FIELD,
+ FLOAT_FIELD,
+ INT16_FIELD,
+ INT32_FIELD,
+ INT64_FIELD,
+ STRING_FIELD);
+
+ private static final Schema NON_USER_WITH_BYTES_PAYLOAD =
+ Schema.of(
+ Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+ Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE),
+ Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES));
+
+ private static final Schema NON_USER_WITH_ROW_PAYLOAD =
+ Schema.of(
+ Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+ Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE),
+ Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA)));
+
+ private static final Schema NON_USER_WITHOUT_PAYLOAD =
+ Schema.of(
+ Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+ Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE));
+
+ static {
+ PIPELINE_OPTIONS.setStableUniqueNames(CheckEnabled.OFF);
+ }
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ @Rule
+ public TestPipeline errorsTestPipeline =
+ TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testExpandThrowsExceptions() {
+ PayloadSerializer jsonPayloadSerializer =
+ new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+ Instant embeddedTimestamp = Instant.now();
+ Instant mockTimestamp = Instant.ofEpochMilli(embeddedTimestamp.getMillis() + 1000000L);
+
+ Field attributesField = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE);
+ Schema withAttributesSchema = Schema.of(attributesField);
+ Row withAttributes =
+ Row.withSchema(withAttributesSchema).attachValues(ImmutableMap.of("aaa", "111"));
+
+ Field timestampField = Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE);
+ Schema withTimestampSchema = Schema.of(timestampField);
+ Row withEmbeddedTimestamp = Row.withSchema(withTimestampSchema).attachValues(embeddedTimestamp);
+
+ Field payloadBytesField = Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES);
+ Schema withPayloadBytesSchema = Schema.of(payloadBytesField);
+ Row withPayloadBytes =
+ Row.withSchema(withPayloadBytesSchema).attachValues(new byte[] {1, 2, 3, 4});
+
+ Row withAllDataTypes =
+ rowWithAllDataTypes(
+ true,
+ (byte) 0,
+ Instant.now().toDateTime(),
+ BigDecimal.valueOf(1L),
+ 3.12345,
+ 4.1f,
+ (short) 5,
+ 2,
+ 7L,
+ "asdfjkl;");
+
+ Field payloadRowField =
+ Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA));
+ Schema withPayloadRowSchema = Schema.of(payloadRowField);
+ Row withPayloadRow = Row.withSchema(withPayloadRowSchema).attachValues(withAllDataTypes);
+
+ PubsubRowToMessage transformWithoutSerializer =
+ PubsubRowToMessage.builder().setMockInstant(mockTimestamp).build();
+
+ PubsubRowToMessage transformWithSerializer =
+ PubsubRowToMessage.builder().setPayloadSerializer(jsonPayloadSerializer).build();
+
+ // requires either payload or user fields
+ errorsTestPipeline
+ .apply(Create.of(withAttributes))
+ .setRowSchema(withAttributesSchema)
+ .apply(transformWithoutSerializer);
+
+ // requires either payload or user fields
+ errorsTestPipeline
+ .apply(Create.of(withEmbeddedTimestamp))
+ .setRowSchema(withTimestampSchema)
+ .apply(transformWithoutSerializer);
+
+ // payload bytes incompatible with non-null payload serializer
+ errorsTestPipeline
+ .apply(Create.of(withPayloadBytes))
+ .setRowSchema(withPayloadBytesSchema)
+ .apply(transformWithSerializer);
+
+ // payload row requires payload serializer
+ errorsTestPipeline
+ .apply(Create.of(withPayloadRow))
+ .setRowSchema(withPayloadRowSchema)
+ .apply(transformWithoutSerializer);
+
+ // user fields row requires payload serializer
+ errorsTestPipeline
+ .apply(Create.of(withAllDataTypes))
+ .setRowSchema(ALL_DATA_TYPES_SCHEMA)
+ .apply(transformWithoutSerializer);
+
+ // input with both payload row and user fields incompatible
+ errorsTestPipeline
+ .apply(Create.of(merge(withPayloadRow, withAllDataTypes)))
+ .setRowSchema(merge(withPayloadRowSchema, ALL_DATA_TYPES_SCHEMA))
+ .apply(transformWithSerializer);
+ }
+
+ @Test
+ public void testExpandError() {
+ PayloadSerializer mismatchedSerializerProviderSchema =
+ new JsonPayloadSerializerProvider()
+ .getSerializer(Schema.of(STRING_FIELD), ImmutableMap.of());
+
+ Row withAllDataTypes =
+ rowWithAllDataTypes(
+ true,
+ (byte) 0,
+ Instant.now().toDateTime(),
+ BigDecimal.valueOf(1L),
+ 3.12345,
+ 4.1f,
+ (short) 5,
+ 2,
+ 7L,
+ "asdfjkl;");
+
+ PubsubRowToMessage transformWithSerializer =
+ PubsubRowToMessage.builder()
+ .setPayloadSerializer(mismatchedSerializerProviderSchema)
+ .build();
+
+ PAssert.that(
+ pipeline
+ .apply(Create.of(withAllDataTypes))
+ .setRowSchema(ALL_DATA_TYPES_SCHEMA)
+ .apply(transformWithSerializer)
+ .get(ERROR)
+ .apply(Count.globally()))
+ .containsInAnyOrder(1L);
+
+ pipeline.run(PIPELINE_OPTIONS);
+ }
+
+ @Test
+ public void testExpandOutput() {
+ PayloadSerializer jsonPayloadSerializer =
+ new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+ Instant embeddedTimestamp = Instant.now();
+ Instant mockTimestamp = Instant.ofEpochMilli(embeddedTimestamp.getMillis() + 1000000L);
+ String embeddedTimestampString = embeddedTimestamp.toString();
+ String mockTimestampString = mockTimestamp.toString();
+
+ byte[] bytes =
+ "All the 🌎's a 🎦, and all the 🚹 and 🚺 merely players. They 🈷️their 🚪and their 🎟️and 1️⃣🚹in his time ▶️many🤺🪂🏆"
+ .getBytes(StandardCharsets.UTF_8);
+
+ Field attributesField = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE);
+ Field timestampField = Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE);
+ Field payloadBytesField = Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES);
+ Field payloadRowField =
+ Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA));
+
+ Row withAllDataTypes =
+ rowWithAllDataTypes(
+ true,
+ (byte) 0,
+ Instant.now().toDateTime(),
+ BigDecimal.valueOf(1L),
+ 3.12345,
+ 4.1f,
+ (short) 5,
+ 2,
+ 7L,
+ "asdfjkl;");
+
+ byte[] serializedWithAllDataTypes = jsonPayloadSerializer.serialize(withAllDataTypes);
+
+ Schema withAttributesSchema = Schema.of(attributesField);
+ Row withAttributes =
+ Row.withSchema(withAttributesSchema).attachValues(ImmutableMap.of("aaa", "111"));
+
+ Schema withTimestampSchema = Schema.of(timestampField);
+ Row withEmbeddedTimestamp = Row.withSchema(withTimestampSchema).attachValues(embeddedTimestamp);
+
+ Schema withPayloadBytesSchema = Schema.of(payloadBytesField);
+ Row withPayloadBytes = Row.withSchema(withPayloadBytesSchema).attachValues(bytes);
+
+ Schema withPayloadRowSchema = Schema.of(payloadRowField);
+ Row withPayloadRow = Row.withSchema(withPayloadRowSchema).attachValues(withAllDataTypes);
+
+ PubsubRowToMessage transformWithoutSerializer =
+ PubsubRowToMessage.builder().setMockInstant(mockTimestamp).build();
+
+ PubsubRowToMessage transformWithSerializer =
+ PubsubRowToMessage.builder()
+ .setMockInstant(mockTimestamp)
+ .setPayloadSerializer(jsonPayloadSerializer)
+ .build();
+
+ PCollection<Row> a0t0p0u1 =
+ pipeline.apply(Create.of(withAllDataTypes)).setRowSchema(ALL_DATA_TYPES_SCHEMA);
+ PubsubMessage a0t0p0u1Message =
+ new PubsubMessage(
+ serializedWithAllDataTypes,
+ ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString));
+ PAssert.that(
+ "expected: payload: user serialized row, attributes: ParDo mocked generated timestamp only",
+ a0t0p0u1.apply(transformWithSerializer).get(OUTPUT))
+ .containsInAnyOrder(a0t0p0u1Message);
+
+ PCollection<Row> a1t0p0u1 =
+ pipeline
+ .apply(Create.of(merge(withAttributes, withAllDataTypes)))
+ .setRowSchema(merge(withAttributesSchema, ALL_DATA_TYPES_SCHEMA));
+ PubsubMessage a1t0p0u1Message =
+ new PubsubMessage(
+ serializedWithAllDataTypes,
+ ImmutableMap.of("aaa", "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString));
+ PAssert.that(
+ "expected: payload: user serialized row, attributes: input attributes and ParDo generated timestamp",
+ a1t0p0u1.apply(transformWithSerializer).get(OUTPUT))
+ .containsInAnyOrder(a1t0p0u1Message);
+
+ PCollection<Row> a0t1p0u1 =
+ pipeline
+ .apply(Create.of(merge(withEmbeddedTimestamp, withAllDataTypes)))
+ .setRowSchema(merge(withTimestampSchema, ALL_DATA_TYPES_SCHEMA));
+ PubsubMessage a0t1p0u1Message =
+ new PubsubMessage(
+ serializedWithAllDataTypes,
+ ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, embeddedTimestampString));
+ PAssert.that(
+ "expected: payload: user serialized row, attributes: timestamp from row input",
+ a0t1p0u1.apply(transformWithSerializer).get(OUTPUT))
+ .containsInAnyOrder(a0t1p0u1Message);
+
+ PCollection<Row> a0t0p1bu0 =
+ pipeline.apply(Create.of(withPayloadBytes)).setRowSchema(withPayloadBytesSchema);
+ PubsubMessage a0t0p1bu0Message =
+ new PubsubMessage(
+ bytes, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString));
+ PAssert.that(
+ "expected: payload: raw bytes, attributes: ParDo generated timestamp",
+ a0t0p1bu0.apply(transformWithoutSerializer).get(OUTPUT))
+ .containsInAnyOrder(a0t0p1bu0Message);
+
+ PCollection<Row> a0t0p1ru0 =
+ pipeline.apply(Create.of(withPayloadRow)).setRowSchema(withPayloadRowSchema);
+ PubsubMessage a0t0p1ru0Message =
+ new PubsubMessage(
+ serializedWithAllDataTypes,
+ ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString));
+ PAssert.that(
+ "expected: payload: non-user row, attributes: ParDo generated timestamp",
+ a0t0p1ru0.apply(transformWithSerializer).get(OUTPUT))
+ .containsInAnyOrder(a0t0p1ru0Message);
+
+ PCollection<Row> a1t0p1bu0 =
+ pipeline
+ .apply(Create.of(merge(withAttributes, withPayloadBytes)))
+ .setRowSchema(merge(withAttributesSchema, withPayloadBytesSchema));
+ PubsubMessage a1t0p1bu0Message =
+ new PubsubMessage(
+ bytes,
+ ImmutableMap.of("aaa", "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString));
+ PAssert.that(
+ "expected: raw bytes, attributes: embedded attributes and ParDo generated timestamp",
+ a1t0p1bu0.apply(transformWithoutSerializer).get(OUTPUT))
+ .containsInAnyOrder(a1t0p1bu0Message);
+
+ PCollection<Row> a0t1p1bu0 =
+ pipeline
+ .apply(Create.of(merge(withEmbeddedTimestamp, withPayloadBytes)))
+ .setRowSchema(merge(withTimestampSchema, withPayloadBytesSchema));
+ PubsubMessage a0t1p1bu0Message =
+ new PubsubMessage(
+ bytes, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, embeddedTimestampString));
+ PAssert.that(
+ "expected: payload: raw bytes, attributes: timestamp from row input",
+ a0t1p1bu0.apply(transformWithoutSerializer).get(OUTPUT))
+ .containsInAnyOrder(a0t1p1bu0Message);
+
+ PCollection<Row> a1t1p1bu0 =
+ pipeline
+ .apply(Create.of(merge(merge(withAttributes, withEmbeddedTimestamp), withPayloadBytes)))
+ .setRowSchema(
+ merge(merge(withAttributesSchema, withTimestampSchema), withPayloadBytesSchema));
+ PubsubMessage a1t1p1bu0Message =
+ new PubsubMessage(
+ bytes,
+ ImmutableMap.of(
+ "aaa", "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, embeddedTimestampString));
+ PAssert.that(
+ "expected: payload: raw bytes, attributes: from row input including its embedded timestamp",
+ a1t1p1bu0.apply(transformWithoutSerializer).get(OUTPUT))
+ .containsInAnyOrder(a1t1p1bu0Message);
+
+ PCollection<Row> a1t1p1ru0 =
+ pipeline
+ .apply(Create.of(merge(merge(withAttributes, withEmbeddedTimestamp), withPayloadRow)))
+ .setRowSchema(
+ merge(merge(withAttributesSchema, withTimestampSchema), withPayloadRowSchema));
+ PubsubMessage a1t1p1ru0Message =
+ new PubsubMessage(
+ serializedWithAllDataTypes,
+ ImmutableMap.of(
+ "aaa", "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, embeddedTimestampString));
+ PAssert.that(
+ "expected: payload: serialized row payload input, attributes: from row input including its embedded timestamp",
+ a1t1p1ru0.apply(transformWithSerializer).get(OUTPUT))
+ .containsInAnyOrder(a1t1p1ru0Message);
+
+ PCollection<Row> a1t1p0u1 =
+ pipeline
+ .apply(Create.of(merge(merge(withAttributes, withEmbeddedTimestamp), withAllDataTypes)))
+ .setRowSchema(
+ merge(merge(withAttributesSchema, withTimestampSchema), ALL_DATA_TYPES_SCHEMA));
+ PubsubMessage a1t1p0u1Message =
+ new PubsubMessage(
+ serializedWithAllDataTypes,
+ ImmutableMap.of(
+ "aaa", "111", DEFAULT_EVENT_TIMESTAMP_KEY_NAME, embeddedTimestampString));
+ PAssert.that(
+ "expected: payload: serialized user fields, attributes: from row input including embedded timestamp",
+ a1t1p0u1.apply(transformWithSerializer).get(OUTPUT))
+ .containsInAnyOrder(a1t1p0u1Message);
+
+ pipeline.run(PIPELINE_OPTIONS);
+ }
+
+ @Test
+ public void testKeyPrefix() {
+ PubsubRowToMessage withDefaultKeyPrefix = PubsubRowToMessage.builder().build();
+ assertEquals(DEFAULT_ATTRIBUTES_KEY_NAME, withDefaultKeyPrefix.getAttributesKeyName());
+ assertEquals(
+ DEFAULT_EVENT_TIMESTAMP_KEY_NAME, withDefaultKeyPrefix.getSourceEventTimestampKeyName());
+ assertEquals(DEFAULT_PAYLOAD_KEY_NAME, withDefaultKeyPrefix.getPayloadKeyName());
+
+ PubsubRowToMessage withKeyPrefixOverride =
+ PubsubRowToMessage.builder().setKeyPrefix("_").build();
+ assertEquals("_" + ATTRIBUTES_KEY_NAME, withKeyPrefixOverride.getAttributesKeyName());
+ assertEquals(
+ "_" + EVENT_TIMESTAMP_KEY_NAME, withKeyPrefixOverride.getSourceEventTimestampKeyName());
+ assertEquals("_" + PAYLOAD_KEY_NAME, withKeyPrefixOverride.getPayloadKeyName());
+ }
+
+ @Test
+ public void testSetTargetTimestampAttributeName() {
+ Instant mockTimestamp = Instant.now();
+ String mockTimestampString = mockTimestamp.toString();
+ byte[] bytes = new byte[] {1, 2, 3, 4};
+
+ Field payloadBytesField = Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES);
+
+ Schema withPayloadBytesSchema = Schema.of(payloadBytesField);
+ Row withPayloadBytes = Row.withSchema(withPayloadBytesSchema).attachValues(bytes);
+
+ String customTargetTimestampAttributeName = "custom_timestamp_key";
+
+ PubsubRowToMessage withoutSetTargetTimestampAttributeName =
+ PubsubRowToMessage.builder().setMockInstant(mockTimestamp).build();
+ PubsubRowToMessage withSetTargetTimestampAttributeName =
+ PubsubRowToMessage.builder()
+ .setMockInstant(mockTimestamp)
+ .setTargetTimestampAttributeName(customTargetTimestampAttributeName)
+ .build();
+
+ PCollection<Row> input =
+ pipeline.apply(Create.of(withPayloadBytes)).setRowSchema(Schema.of(payloadBytesField));
+
+ PAssert.that(input.apply(withoutSetTargetTimestampAttributeName).get(OUTPUT))
+ .containsInAnyOrder(
+ new PubsubMessage(
+ bytes, ImmutableMap.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, mockTimestampString)));
+
+ PAssert.that(input.apply(withSetTargetTimestampAttributeName).get(OUTPUT))
+ .containsInAnyOrder(
+ new PubsubMessage(
+ bytes, ImmutableMap.of(customTargetTimestampAttributeName, mockTimestampString)));
+
+ pipeline.run(PIPELINE_OPTIONS);
+ }
+
+ @Test
+ public void testInputSchemaFactory() {
+ assertEquals(
+ Schema.of(
+ Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+ Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE)),
+ PubsubRowToMessage.builder().build().inputSchemaFactory().buildSchema());
+
+ assertEquals(
+ Schema.of(
+ Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+ Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE),
+ Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES)),
+ PubsubRowToMessage.builder().build().inputSchemaFactory(FieldType.BYTES).buildSchema());
+
+ assertEquals(
+ Schema.of(
+ Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+ Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE),
+ Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA))),
+ PubsubRowToMessage.builder()
+ .build()
+ .inputSchemaFactory(FieldType.row(ALL_DATA_TYPES_SCHEMA))
+ .buildSchema());
+
+ String prefix = "_";
+ assertEquals(
+ Schema.of(
+ Field.of(prefix + ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+ Field.of(prefix + EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE)),
+ PubsubRowToMessage.builder()
+ .setKeyPrefix(prefix)
+ .build()
+ .inputSchemaFactory()
+ .buildSchema());
+
+ Field[] userFields = ALL_DATA_TYPES_SCHEMA.getFields().toArray(new Field[0]);
+ assertEquals(
+ merge(
+ Schema.of(
+ Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE),
+ Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE)),
+ ALL_DATA_TYPES_SCHEMA),
+ PubsubRowToMessage.builder().build().inputSchemaFactory().buildSchema(userFields));
+ }
+
+ @Test
+ public void testValidate() {
+ PubsubRowToMessage pubsubRowToMessage = PubsubRowToMessage.builder().build();
+ PayloadSerializer jsonPayloadSerializer =
+ new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> pubsubRowToMessage.validate(Schema.builder().build()));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ pubsubRowToMessage.validate(
+ Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, FieldType.STRING))));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ pubsubRowToMessage.validate(
+ Schema.of(
+ Field.of(
+ DEFAULT_ATTRIBUTES_KEY_NAME,
+ FieldType.map(FieldType.STRING, FieldType.BYTES)))));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ pubsubRowToMessage.validate(
+ Schema.of(
+ Field.of(
+ DEFAULT_ATTRIBUTES_KEY_NAME,
+ FieldType.map(FieldType.BYTES, FieldType.STRING)))));
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ pubsubRowToMessage.validate(
+ Schema.of(Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, FieldType.STRING))));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> PubsubRowToMessage.builder().build().validate(ALL_DATA_TYPES_SCHEMA));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> PubsubRowToMessage.builder().build().validate(NON_USER_WITH_ROW_PAYLOAD));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ PubsubRowToMessage.builder()
+ .setPayloadSerializer(jsonPayloadSerializer)
+ .build()
+ .validate(NON_USER_WITH_BYTES_PAYLOAD));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ PubsubRowToMessage.builder()
+ .build()
+ .validate(merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_BYTES_PAYLOAD)));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ PubsubRowToMessage.builder()
+ .build()
+ .validate(merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_ROW_PAYLOAD)));
+ }
+
+ @Test
+ public void testValidateAttributesField() {
+ PubsubRowToMessage pubsubRowToMessage = PubsubRowToMessage.builder().build();
+ pubsubRowToMessage.validateAttributesField(ALL_DATA_TYPES_SCHEMA);
+ pubsubRowToMessage.validateAttributesField(
+ Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE)));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ pubsubRowToMessage.validateAttributesField(
+ Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, FieldType.STRING))));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ pubsubRowToMessage.validateAttributesField(
+ Schema.of(
+ Field.of(
+ DEFAULT_ATTRIBUTES_KEY_NAME,
+ FieldType.map(FieldType.STRING, FieldType.BYTES)))));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ pubsubRowToMessage.validateAttributesField(
+ Schema.of(
+ Field.of(
+ DEFAULT_ATTRIBUTES_KEY_NAME,
+ FieldType.map(FieldType.BYTES, FieldType.STRING)))));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ PubsubRowToMessage.builder().build().validateSerializableFields(ALL_DATA_TYPES_SCHEMA));
+ }
+
+ @Test
+ public void testValidateSourceEventTimeStampField() {
+ PubsubRowToMessage pubsubRowToMessage = PubsubRowToMessage.builder().build();
+ pubsubRowToMessage.validateSourceEventTimeStampField(ALL_DATA_TYPES_SCHEMA);
+ pubsubRowToMessage.validateSourceEventTimeStampField(
+ Schema.of(Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE)));
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ pubsubRowToMessage.validateSourceEventTimeStampField(
+ Schema.of(Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, FieldType.STRING))));
+ }
+
+ @Test
+ public void testShouldValidatePayloadSerializerNullState() {
+ PayloadSerializer jsonPayloadSerializer =
+ new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+ PayloadSerializer avroPayloadSerializer =
+ new AvroPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+ PubsubRowToMessage.builder()
+ .setPayloadSerializer(jsonPayloadSerializer)
+ .build()
+ .validateSerializableFields(ALL_DATA_TYPES_SCHEMA);
+
+ PubsubRowToMessage.builder()
+ .setPayloadSerializer(avroPayloadSerializer)
+ .build()
+ .validateSerializableFields(ALL_DATA_TYPES_SCHEMA);
+
+ PubsubRowToMessage.builder()
+ .setPayloadSerializer(jsonPayloadSerializer)
+ .build()
+ .validateSerializableFields(NON_USER_WITH_ROW_PAYLOAD);
+
+ PubsubRowToMessage.builder()
+ .setPayloadSerializer(avroPayloadSerializer)
+ .build()
+ .validateSerializableFields(NON_USER_WITH_ROW_PAYLOAD);
+
+ PubsubRowToMessage.builder().build().validateSerializableFields(NON_USER_WITH_BYTES_PAYLOAD);
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ PubsubRowToMessage.builder().build().validateSerializableFields(ALL_DATA_TYPES_SCHEMA));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ PubsubRowToMessage.builder()
+ .build()
+ .validateSerializableFields(NON_USER_WITH_ROW_PAYLOAD));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ PubsubRowToMessage.builder()
+ .setPayloadSerializer(jsonPayloadSerializer)
+ .build()
+ .validateSerializableFields(NON_USER_WITH_BYTES_PAYLOAD));
+ }
+
+ @Test
+ public void testShouldEitherHavePayloadOrUserFields() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ PubsubRowToMessage.builder()
+ .build()
+ .validateSerializableFields(
+ Schema.of(
+ Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE))));
+ }
+
+ @Test
+ public void testShouldNotAllowPayloadFieldWithUserFields() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ PubsubRowToMessage.builder()
+ .build()
+ .validateSerializableFields(
+ merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_BYTES_PAYLOAD)));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ PubsubRowToMessage.builder()
+ .build()
+ .validateSerializableFields(
+ merge(ALL_DATA_TYPES_SCHEMA, NON_USER_WITH_ROW_PAYLOAD)));
+ }
+
+ @Test
+ public void testSchemaReflection_matchesAll() {
+ SchemaReflection schemaReflection = SchemaReflection.of(ALL_DATA_TYPES_SCHEMA);
+
+ assertTrue(
+ schemaReflection.matchesAll(
+ FieldMatcher.of(BOOLEAN_FIELD.getName()),
+ FieldMatcher.of(BYTE_FIELD.getName()),
+ FieldMatcher.of(DATETIME_FIELD.getName()),
+ FieldMatcher.of(DECIMAL_FIELD.getName()),
+ FieldMatcher.of(DOUBLE_FIELD.getName()),
+ FieldMatcher.of(FLOAT_FIELD.getName()),
+ FieldMatcher.of(INT16_FIELD.getName()),
+ FieldMatcher.of(INT32_FIELD.getName()),
+ FieldMatcher.of(INT64_FIELD.getName()),
+ FieldMatcher.of(STRING_FIELD.getName())));
+
+ assertTrue(
+ schemaReflection.matchesAll(
+ FieldMatcher.of(BOOLEAN_FIELD.getName(), FieldType.BOOLEAN),
+ FieldMatcher.of(BYTE_FIELD.getName(), FieldType.BYTE),
+ FieldMatcher.of(DATETIME_FIELD.getName(), FieldType.DATETIME),
+ FieldMatcher.of(DECIMAL_FIELD.getName(), FieldType.DECIMAL),
+ FieldMatcher.of(DOUBLE_FIELD.getName(), FieldType.DOUBLE),
+ FieldMatcher.of(FLOAT_FIELD.getName(), FieldType.FLOAT),
+ FieldMatcher.of(INT16_FIELD.getName(), FieldType.INT16),
+ FieldMatcher.of(INT32_FIELD.getName(), FieldType.INT32),
+ FieldMatcher.of(INT64_FIELD.getName(), FieldType.INT64),
+ FieldMatcher.of(STRING_FIELD.getName(), FieldType.STRING)));
+
+ assertTrue(
+ schemaReflection.matchesAll(
+ FieldMatcher.of(BOOLEAN_FIELD.getName(), TypeName.BOOLEAN),
+ FieldMatcher.of(BYTE_FIELD.getName(), TypeName.BYTE),
+ FieldMatcher.of(DATETIME_FIELD.getName(), TypeName.DATETIME),
+ FieldMatcher.of(DECIMAL_FIELD.getName(), TypeName.DECIMAL),
+ FieldMatcher.of(DOUBLE_FIELD.getName(), TypeName.DOUBLE),
+ FieldMatcher.of(FLOAT_FIELD.getName(), TypeName.FLOAT),
+ FieldMatcher.of(INT16_FIELD.getName(), TypeName.INT16),
+ FieldMatcher.of(INT32_FIELD.getName(), TypeName.INT32),
+ FieldMatcher.of(INT64_FIELD.getName(), TypeName.INT64),
+ FieldMatcher.of(STRING_FIELD.getName(), TypeName.STRING)));
+
+ assertFalse(
+ schemaReflection.matchesAll(
+ FieldMatcher.of("idontexist"),
+ FieldMatcher.of(BOOLEAN_FIELD.getName()),
+ FieldMatcher.of(BYTE_FIELD.getName()),
+ FieldMatcher.of(DATETIME_FIELD.getName()),
+ FieldMatcher.of(DECIMAL_FIELD.getName()),
+ FieldMatcher.of(DOUBLE_FIELD.getName()),
+ FieldMatcher.of(FLOAT_FIELD.getName()),
+ FieldMatcher.of(INT16_FIELD.getName()),
+ FieldMatcher.of(INT32_FIELD.getName()),
+ FieldMatcher.of(INT64_FIELD.getName()),
+ FieldMatcher.of(STRING_FIELD.getName())));
+
+ assertFalse(
+ schemaReflection.matchesAll(
+ FieldMatcher.of(BOOLEAN_FIELD.getName(), FieldType.BOOLEAN),
+ FieldMatcher.of(BYTE_FIELD.getName(), FieldType.BYTE),
+ FieldMatcher.of(DATETIME_FIELD.getName(), FieldType.DATETIME),
+ FieldMatcher.of(DECIMAL_FIELD.getName(), FieldType.DECIMAL),
+ FieldMatcher.of(DOUBLE_FIELD.getName(), FieldType.DOUBLE),
+ FieldMatcher.of(FLOAT_FIELD.getName(), FieldType.FLOAT),
+ FieldMatcher.of(INT16_FIELD.getName(), FieldType.INT16),
+ FieldMatcher.of(INT32_FIELD.getName(), FieldType.INT32),
+ FieldMatcher.of(INT64_FIELD.getName(), FieldType.INT64),
+ // should not match type:
+ FieldMatcher.of(STRING_FIELD.getName(), FieldType.BYTE)));
+
+ assertFalse(
+ schemaReflection.matchesAll(
+ FieldMatcher.of(BOOLEAN_FIELD.getName(), TypeName.BOOLEAN),
+ FieldMatcher.of(BYTE_FIELD.getName(), TypeName.BYTE),
+ FieldMatcher.of(DATETIME_FIELD.getName(), TypeName.DATETIME),
+ FieldMatcher.of(DECIMAL_FIELD.getName(), TypeName.DECIMAL),
+ FieldMatcher.of(DOUBLE_FIELD.getName(), TypeName.DOUBLE),
+ FieldMatcher.of(FLOAT_FIELD.getName(), TypeName.FLOAT),
+ FieldMatcher.of(INT16_FIELD.getName(), TypeName.INT16),
+ FieldMatcher.of(INT32_FIELD.getName(), TypeName.INT32),
+ FieldMatcher.of(INT64_FIELD.getName(), TypeName.INT64),
+ // should not match TypeName:
+ FieldMatcher.of(STRING_FIELD.getName(), TypeName.INT16)));
+ }
+
+ @Test
+ public void testFieldMatcher_match_NameOnly() {
+ FieldMatcher fieldMatcher = FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME);
+ assertTrue(fieldMatcher.match(NON_USER_WITH_ROW_PAYLOAD));
+ assertTrue(fieldMatcher.match(NON_USER_WITH_BYTES_PAYLOAD));
+ assertFalse(fieldMatcher.match(ALL_DATA_TYPES_SCHEMA));
+ }
+
+ @Test
+ public void testFieldMatcher_match_TypeName() {
+ assertTrue(
+ FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.ROW).match(NON_USER_WITH_ROW_PAYLOAD));
+ assertTrue(
+ FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.BYTES)
+ .match(NON_USER_WITH_BYTES_PAYLOAD));
+ assertFalse(
+ FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.ROW).match(NON_USER_WITH_BYTES_PAYLOAD));
+ assertFalse(
+ FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.BYTES).match(NON_USER_WITH_ROW_PAYLOAD));
+ }
+
+ @Test
+ public void testFieldMatcher_match_FieldType() {
+ assertTrue(
+ FieldMatcher.of(
+ DEFAULT_ATTRIBUTES_KEY_NAME, FieldType.map(FieldType.STRING, FieldType.STRING))
+ .match(NON_USER_WITH_BYTES_PAYLOAD));
+ assertFalse(
+ FieldMatcher.of(
+ DEFAULT_ATTRIBUTES_KEY_NAME, FieldType.map(FieldType.STRING, FieldType.BYTES))
+ .match(NON_USER_WITH_BYTES_PAYLOAD));
+ }
+
+ @Test
+ public void testRemoveFields() {
+ Schema input = Schema.of(STRING_FIELD, BOOLEAN_FIELD, INT64_FIELD);
+ Schema expected = Schema.builder().build();
+ Schema actual =
+ removeFields(input, STRING_FIELD.getName(), BOOLEAN_FIELD.getName(), INT64_FIELD.getName());
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testPubsubRowToMessageParDo_attributesWithoutTimestamp() {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("a", "1");
+ attributes.put("b", "2");
+ attributes.put("c", "3");
+
+ Row withAttributes =
+ Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD)
+ .addValues(attributes, Instant.now(), new byte[] {})
+ .build();
+
+ assertEquals(
+ attributes,
+ doFn(NON_USER_WITH_BYTES_PAYLOAD, null).attributesWithoutTimestamp(withAttributes));
+
+ Schema withoutAttributesSchema =
+ removeFields(NON_USER_WITH_BYTES_PAYLOAD, DEFAULT_ATTRIBUTES_KEY_NAME);
+ Row withoutAttributes =
+ Row.withSchema(withoutAttributesSchema).addValues(Instant.now(), new byte[] {}).build();
+
+ assertEquals(
+ ImmutableMap.of(),
+ doFn(withoutAttributesSchema, null).attributesWithoutTimestamp(withoutAttributes));
+ }
+
+ @Test
+ public void testPubsubRowToMessageParDo_timestamp() {
+ Instant timestamp = Instant.now();
+ Row withTimestamp =
+ Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD)
+ .addValues(ImmutableMap.of(), timestamp, new byte[] {})
+ .build();
+ assertEquals(
+ timestamp.toString(),
+ doFn(NON_USER_WITH_BYTES_PAYLOAD, null).timestamp(withTimestamp).toString());
+
+ Schema withoutTimestampSchema =
+ removeFields(NON_USER_WITH_BYTES_PAYLOAD, DEFAULT_EVENT_TIMESTAMP_KEY_NAME);
+ Row withoutTimestamp =
+ Row.withSchema(withoutTimestampSchema).addValues(ImmutableMap.of(), new byte[] {}).build();
+ ReadableDateTime actual = doFn(withoutTimestampSchema, null).timestamp(withoutTimestamp);
+ assertNotNull(actual);
+ }
+
+ @Test
+ public void testPubsubRowToMessageParDo_payload() {
+ PayloadSerializer jsonPayloadSerializer =
+ new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+ PayloadSerializer avroPayloadSerializer =
+ new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+ byte[] bytes = "abcdefg".getBytes(StandardCharsets.UTF_8);
+ assertArrayEquals(
+ bytes,
+ doFn(NON_USER_WITH_BYTES_PAYLOAD, null)
+ .payload(
+ Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD)
+ .addValues(ImmutableMap.of(), Instant.now(), bytes)
+ .build()));
+
+ Row withoutUserFields =
+ Row.withSchema(NON_USER_WITH_ROW_PAYLOAD)
+ .addValues(
+ ImmutableMap.of(),
+ Instant.now(),
+ rowWithAllDataTypes(
+ true,
+ (byte) 1,
+ Instant.now().toDateTime(),
+ BigDecimal.valueOf(100L),
+ 1.12345,
+ 1.1f,
+ (short) 1,
+ 1,
+ 1L,
+ "abcdefg"))
+ .build();
+
+ assertArrayEquals(
+ jsonPayloadSerializer.serialize(withoutUserFields.getRow(DEFAULT_PAYLOAD_KEY_NAME)),
+ doFn(NON_USER_WITH_ROW_PAYLOAD, jsonPayloadSerializer).payload(withoutUserFields));
+
+ assertArrayEquals(
+ avroPayloadSerializer.serialize(withoutUserFields.getRow(DEFAULT_PAYLOAD_KEY_NAME)),
+ doFn(NON_USER_WITH_ROW_PAYLOAD, avroPayloadSerializer).payload(withoutUserFields));
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("a", "1");
+ attributes.put("b", "2");
+ attributes.put("c", "3");
+
+ Row withAttributesAndTimestamp =
+ Row.withSchema(NON_USER_WITHOUT_PAYLOAD).addValues(attributes, Instant.now()).build();
+
+ Row withUserFields =
+ rowWithAllDataTypes(
+ false,
+ (byte) 0,
+ Instant.now().toDateTime(),
+ BigDecimal.valueOf(200L),
+ 2.12345,
+ 2.1f,
+ (short) 2,
+ 2,
+ 2L,
+ "1234567");
+
+ Row withUserFieldsAndAttributesTimestamp = merge(withAttributesAndTimestamp, withUserFields);
+
+ assertArrayEquals(
+ jsonPayloadSerializer.serialize(withUserFields),
+ doFn(withUserFieldsAndAttributesTimestamp.getSchema(), jsonPayloadSerializer)
+ .payload(withUserFieldsAndAttributesTimestamp));
+
+ assertArrayEquals(
+ avroPayloadSerializer.serialize(withUserFields),
+ doFn(withUserFieldsAndAttributesTimestamp.getSchema(), avroPayloadSerializer)
+ .payload(withUserFieldsAndAttributesTimestamp));
+ }
+
+ @Test
+ public void testPubsubRowToMessageParDo_serializableRow() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ doFn(NON_USER_WITH_BYTES_PAYLOAD, null)
+ .serializableRow(
+ Row.withSchema(NON_USER_WITH_BYTES_PAYLOAD)
+ .attachValues(ImmutableMap.of(), Instant.now(), new byte[] {})));
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("a", "1");
+ attributes.put("b", "2");
+ attributes.put("c", "3");
+
+ Row withAllDataTypes =
+ rowWithAllDataTypes(
+ true,
+ (byte) 0,
+ Instant.now().toDateTime(),
+ BigDecimal.valueOf(1L),
+ 3.12345,
+ 4.1f,
+ (short) 5,
+ 2,
+ 7L,
+ "asdfjkl;");
+
+ Row nonUserWithRowPayload =
+ Row.withSchema(NON_USER_WITH_ROW_PAYLOAD)
+ .attachValues(attributes, Instant.now(), withAllDataTypes);
+
+ PayloadSerializer jsonPayloadSerializer =
+ new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of());
+
+ assertEquals(
+ withAllDataTypes,
+ doFn(NON_USER_WITH_ROW_PAYLOAD, jsonPayloadSerializer)
+ .serializableRow(nonUserWithRowPayload));
+
+ Row withAttributesAndTimestamp =
+ Row.withSchema(NON_USER_WITHOUT_PAYLOAD).addValues(attributes, Instant.now()).build();
+
+ Row withUserFieldsAttributesAndTimestamp = merge(withAttributesAndTimestamp, withAllDataTypes);
+
+ assertEquals(
+ withAllDataTypes,
+ doFn(withUserFieldsAttributesAndTimestamp.getSchema(), jsonPayloadSerializer)
+ .serializableRow(withUserFieldsAttributesAndTimestamp));
+ }
+
+ private static PubsubRowToMessageDoFn doFn(Schema schema, PayloadSerializer payloadSerializer) {
+ return new PubsubRowToMessageDoFn(
+ DEFAULT_ATTRIBUTES_KEY_NAME,
+ DEFAULT_EVENT_TIMESTAMP_KEY_NAME,
+ DEFAULT_PAYLOAD_KEY_NAME,
+ errorSchema(schema),
+ DEFAULT_EVENT_TIMESTAMP_KEY_NAME,
+ null,
+ payloadSerializer);
+ }
+
+ private static Row rowWithAllDataTypes(
+ boolean boolean0,
+ byte byte0,
+ ReadableDateTime datetime,
+ BigDecimal decimal,
+ Double double0,
+ Float float0,
+ Short int16,
+ Integer int32,
+ Long int64,
+ String string) {
+ return Row.withSchema(ALL_DATA_TYPES_SCHEMA)
+ .addValues(boolean0, byte0, datetime, decimal, double0, float0, int16, int32, int64, string)
+ .build();
+ }
+
+ private static Schema merge(Schema a, Schema b) {
+ List<Field> fields = new ArrayList<>(a.getFields());
+ fields.addAll(b.getFields());
+ Schema.Builder builder = Schema.builder();
+ for (Field field : fields) {
+ builder = builder.addField(field);
+ }
+ return builder.build();
+ }
+
+ private static Row merge(Row a, Row b) {
+ Schema schema = merge(a.getSchema(), b.getSchema());
+ Map<String, Object> values = new HashMap<>();
+ for (String name : a.getSchema().getFieldNames()) {
+ values.put(name, a.getValue(name));
+ }
+ for (String name : b.getSchema().getFieldNames()) {
+ values.put(name, b.getValue(name));
+ }
+ return Row.withSchema(schema).withFieldValues(values).build();
+ }
+}