You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/28 17:52:58 UTC

[beam] branch master updated: Implement PubsubSchemaTransformMessageToFactory

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dd3944d3b2e Implement PubsubSchemaTransformMessageToFactory
     new cf4dec9f0a5 Merge pull request #22070 from Fix PubsubSchemaTransformMessageToFactory
dd3944d3b2e is described below

commit dd3944d3b2e193f5ecfb6a1534ddb5b373d7a8bf
Author: Damon Douglas <da...@google.com>
AuthorDate: Mon Jun 27 22:45:52 2022 +0000

    Implement PubsubSchemaTransformMessageToFactory
---
 .../PubsubSchemaTransformMessageToRowFactory.java  | 24 ++++++++++---------
 ...bsubSchemaTransformMessageToRowFactoryTest.java | 27 ++++++++++------------
 2 files changed, 25 insertions(+), 26 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
index 1de99e74cf9..0ee2ac433ff 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
@@ -21,15 +21,14 @@ import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FI
 import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
 import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
 import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.checkerframework.checker.nullness.qual.NonNull;
 
 /**
  * Builds a {@link PubsubMessageToRow} from a {@link PubsubSchemaTransformReadConfiguration}.
@@ -62,7 +61,7 @@ class PubsubSchemaTransformMessageToRowFactory {
    * PubsubSchemaTransformReadConfiguration}.
    */
   static PubsubSchemaTransformMessageToRowFactory from(
-      @NonNull PubsubSchemaTransformReadConfiguration configuration) {
+      PubsubSchemaTransformReadConfiguration configuration) {
     return new PubsubSchemaTransformMessageToRowFactory(configuration);
   }
 
@@ -71,7 +70,9 @@ class PubsubSchemaTransformMessageToRowFactory {
     PubsubMessageToRow.Builder builder =
         PubsubMessageToRow.builder()
             .messageSchema(configuration.getDataSchema())
-            .useDlq(!Strings.isNullOrEmpty(configuration.getDeadLetterQueue()))
+            .useDlq(
+                configuration.getDeadLetterQueue() != null
+                    && !configuration.getDeadLetterQueue().isEmpty())
             .useFlatSchema(!shouldUseNestedSchema());
 
     if (needsSerializer()) {
@@ -92,25 +93,26 @@ class PubsubSchemaTransformMessageToRowFactory {
     Schema schema = configuration.getDataSchema();
     String format = DEFAULT_FORMAT;
 
-    if (!Strings.isNullOrEmpty(configuration.getFormat())) {
+    if (configuration.getFormat() != null && !configuration.getFormat().isEmpty()) {
       format = configuration.getFormat();
     }
 
-    ImmutableMap.Builder<String, Object> params = ImmutableMap.builder();
+    Map<String, Object> params = new HashMap<>();
 
-    if (!Strings.isNullOrEmpty(configuration.getThriftClass())) {
+    if (configuration.getThriftClass() != null && !configuration.getThriftClass().isEmpty()) {
       params.put(THRIFT_CLASS_KEY, configuration.getThriftClass());
     }
 
-    if (!Strings.isNullOrEmpty(configuration.getThriftProtocolFactoryClass())) {
+    if (configuration.getThriftProtocolFactoryClass() != null
+        && !configuration.getThriftProtocolFactoryClass().isEmpty()) {
       params.put(THRIFT_PROTOCOL_FACTORY_CLASS_KEY, configuration.getThriftProtocolFactoryClass());
     }
 
-    if (!Strings.isNullOrEmpty(configuration.getProtoClass())) {
+    if (configuration.getProtoClass() != null && !configuration.getProtoClass().isEmpty()) {
       params.put(PROTO_CLASS_KEY, configuration.getProtoClass());
     }
 
-    return PayloadSerializers.getSerializer(format, schema, params.build());
+    return PayloadSerializers.getSerializer(format, schema, params);
   }
 
   PubsubMessageToRow.SerializerProvider serializer() {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
index 35f8500946f..f5dc74ecf40 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
@@ -20,21 +20,21 @@ package org.apache.beam.sdk.io.gcp.pubsub;
 import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
 import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.Map.Entry;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.io.payloads.AvroPayloadSerializerProvider;
 import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
 import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
 import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializerProvider;
 import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -176,7 +176,7 @@ public class PubsubSchemaTransformMessageToRowFactoryTest {
         continue;
       }
 
-      Row serializerInput = Objects.requireNonNull(testCase.serializerInput);
+      Row serializerInput = testCase.serializerInput;
 
       byte[] expectedBytes =
           testCase
@@ -246,13 +246,11 @@ public class PubsubSchemaTransformMessageToRowFactoryTest {
   public void fieldPresent() {
     for (TestCase testCase : cases) {
       PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-      for (Map.Entry<Pair<String, Schema.FieldType>, Boolean> entry :
-          testCase.shouldFieldPresent.entrySet()) {
-        Pair<String, Schema.FieldType> pair = entry.getKey();
-        boolean expected = entry.getValue();
-        boolean actual = factory.fieldPresent(pair.getKey(), pair.getValue());
+      for (Entry<String, FieldType> entry : testCase.shouldFieldPresent.entrySet()) {
 
-        assertEquals(expected, actual);
+        boolean actual = factory.fieldPresent(entry.getKey(), entry.getValue());
+
+        assertTrue(actual);
       }
     }
   }
@@ -272,7 +270,7 @@ public class PubsubSchemaTransformMessageToRowFactoryTest {
     private boolean shouldNeedSerializer = true;
     private boolean shouldSchemaHaveValidPayloadField = false;
     private boolean shouldSchemaHaveValidAttributesField = false;
-    private final Map<Pair<String, Schema.FieldType>, Boolean> shouldFieldPresent = new HashMap<>();
+    private final Map<String, FieldType> shouldFieldPresent = new HashMap<>();
 
     private Row serializerInput;
 
@@ -304,10 +302,9 @@ public class PubsubSchemaTransformMessageToRowFactoryTest {
     }
 
     PubsubMessageToRow.SerializerProvider expectSerializerProvider() {
-      ImmutableMap.Builder<String, Object> params = ImmutableMap.builder();
+      Map<String, Object> params = new HashMap<>();
       PayloadSerializer payloadSerializer =
-          expectPayloadSerializerProvider.getSerializer(
-              configuration.getDataSchema(), params.build());
+          expectPayloadSerializerProvider.getSerializer(configuration.getDataSchema(), params);
 
       return (input -> payloadSerializer);
     }
@@ -333,7 +330,7 @@ public class PubsubSchemaTransformMessageToRowFactoryTest {
     }
 
     TestCase fieldShouldBePresent(String name, Schema.FieldType expectedType) {
-      this.shouldFieldPresent.put(Pair.of(name, expectedType), true);
+      this.shouldFieldPresent.put(name, expectedType);
       return this;
     }
   }