You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/28 17:52:58 UTC
[beam] branch master updated: Implement PubsubSchemaTransformMessageToFactory
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dd3944d3b2e Implement PubsubSchemaTransformMessageToFactory
new cf4dec9f0a5 Merge pull request #22070 from Fix PubsubSchemaTransformMessageToFactory
dd3944d3b2e is described below
commit dd3944d3b2e193f5ecfb6a1534ddb5b373d7a8bf
Author: Damon Douglas <da...@google.com>
AuthorDate: Mon Jun 27 22:45:52 2022 +0000
Implement PubsubSchemaTransformMessageToFactory
---
.../PubsubSchemaTransformMessageToRowFactory.java | 24 ++++++++++---------
...bsubSchemaTransformMessageToRowFactoryTest.java | 27 ++++++++++------------
2 files changed, 25 insertions(+), 26 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
index 1de99e74cf9..0ee2ac433ff 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
@@ -21,15 +21,14 @@ import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FI
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.checkerframework.checker.nullness.qual.NonNull;
/**
* Builds a {@link PubsubMessageToRow} from a {@link PubsubSchemaTransformReadConfiguration}.
@@ -62,7 +61,7 @@ class PubsubSchemaTransformMessageToRowFactory {
* PubsubSchemaTransformReadConfiguration}.
*/
static PubsubSchemaTransformMessageToRowFactory from(
- @NonNull PubsubSchemaTransformReadConfiguration configuration) {
+ PubsubSchemaTransformReadConfiguration configuration) {
return new PubsubSchemaTransformMessageToRowFactory(configuration);
}
@@ -71,7 +70,9 @@ class PubsubSchemaTransformMessageToRowFactory {
PubsubMessageToRow.Builder builder =
PubsubMessageToRow.builder()
.messageSchema(configuration.getDataSchema())
- .useDlq(!Strings.isNullOrEmpty(configuration.getDeadLetterQueue()))
+ .useDlq(
+ configuration.getDeadLetterQueue() != null
+ && !configuration.getDeadLetterQueue().isEmpty())
.useFlatSchema(!shouldUseNestedSchema());
if (needsSerializer()) {
@@ -92,25 +93,26 @@ class PubsubSchemaTransformMessageToRowFactory {
Schema schema = configuration.getDataSchema();
String format = DEFAULT_FORMAT;
- if (!Strings.isNullOrEmpty(configuration.getFormat())) {
+ if (configuration.getFormat() != null && !configuration.getFormat().isEmpty()) {
format = configuration.getFormat();
}
- ImmutableMap.Builder<String, Object> params = ImmutableMap.builder();
+ Map<String, Object> params = new HashMap<>();
- if (!Strings.isNullOrEmpty(configuration.getThriftClass())) {
+ if (configuration.getThriftClass() != null && !configuration.getThriftClass().isEmpty()) {
params.put(THRIFT_CLASS_KEY, configuration.getThriftClass());
}
- if (!Strings.isNullOrEmpty(configuration.getThriftProtocolFactoryClass())) {
+ if (configuration.getThriftProtocolFactoryClass() != null
+ && !configuration.getThriftProtocolFactoryClass().isEmpty()) {
params.put(THRIFT_PROTOCOL_FACTORY_CLASS_KEY, configuration.getThriftProtocolFactoryClass());
}
- if (!Strings.isNullOrEmpty(configuration.getProtoClass())) {
+ if (configuration.getProtoClass() != null && !configuration.getProtoClass().isEmpty()) {
params.put(PROTO_CLASS_KEY, configuration.getProtoClass());
}
- return PayloadSerializers.getSerializer(format, schema, params.build());
+ return PayloadSerializers.getSerializer(format, schema, params);
}
PubsubMessageToRow.SerializerProvider serializer() {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
index 35f8500946f..f5dc74ecf40 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
@@ -20,21 +20,21 @@ package org.apache.beam.sdk.io.gcp.pubsub;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
+import java.util.Map.Entry;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.io.payloads.AvroPayloadSerializerProvider;
import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializerProvider;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -176,7 +176,7 @@ public class PubsubSchemaTransformMessageToRowFactoryTest {
continue;
}
- Row serializerInput = Objects.requireNonNull(testCase.serializerInput);
+ Row serializerInput = testCase.serializerInput;
byte[] expectedBytes =
testCase
@@ -246,13 +246,11 @@ public class PubsubSchemaTransformMessageToRowFactoryTest {
public void fieldPresent() {
for (TestCase testCase : cases) {
PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
- for (Map.Entry<Pair<String, Schema.FieldType>, Boolean> entry :
- testCase.shouldFieldPresent.entrySet()) {
- Pair<String, Schema.FieldType> pair = entry.getKey();
- boolean expected = entry.getValue();
- boolean actual = factory.fieldPresent(pair.getKey(), pair.getValue());
+ for (Entry<String, FieldType> entry : testCase.shouldFieldPresent.entrySet()) {
- assertEquals(expected, actual);
+ boolean actual = factory.fieldPresent(entry.getKey(), entry.getValue());
+
+ assertTrue(actual);
}
}
}
@@ -272,7 +270,7 @@ public class PubsubSchemaTransformMessageToRowFactoryTest {
private boolean shouldNeedSerializer = true;
private boolean shouldSchemaHaveValidPayloadField = false;
private boolean shouldSchemaHaveValidAttributesField = false;
- private final Map<Pair<String, Schema.FieldType>, Boolean> shouldFieldPresent = new HashMap<>();
+ private final Map<String, FieldType> shouldFieldPresent = new HashMap<>();
private Row serializerInput;
@@ -304,10 +302,9 @@ public class PubsubSchemaTransformMessageToRowFactoryTest {
}
PubsubMessageToRow.SerializerProvider expectSerializerProvider() {
- ImmutableMap.Builder<String, Object> params = ImmutableMap.builder();
+ Map<String, Object> params = new HashMap<>();
PayloadSerializer payloadSerializer =
- expectPayloadSerializerProvider.getSerializer(
- configuration.getDataSchema(), params.build());
+ expectPayloadSerializerProvider.getSerializer(configuration.getDataSchema(), params);
return (input -> payloadSerializer);
}
@@ -333,7 +330,7 @@ public class PubsubSchemaTransformMessageToRowFactoryTest {
}
TestCase fieldShouldBePresent(String name, Schema.FieldType expectedType) {
- this.shouldFieldPresent.put(Pair.of(name, expectedType), true);
+ this.shouldFieldPresent.put(name, expectedType);
return this;
}
}