You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/09 15:42:11 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #14971: [BEAM-12297] Add methods to PubsubIO for reading DynamicMessage

TheNeuralBit commented on a change in pull request #14971:
URL: https://github.com/apache/beam/pull/14971#discussion_r648441321



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
##########
@@ -416,6 +423,96 @@ public void after() throws IOException {
     }
   }
 
+  @Test
+  public void testProto() {
+    ProtoCoder<Primitive> coder = ProtoCoder.of(Primitive.class);
+    ImmutableList<Primitive> inputs =
+        ImmutableList.of(
+            Primitive.newBuilder().setPrimitiveInt32(42).build(),
+            Primitive.newBuilder().setPrimitiveBool(true).build(),
+            Primitive.newBuilder().setPrimitiveString("Hello, World!").build());
+    setupTestClient(inputs, coder);
+    PCollection<Primitive> read =
+        readPipeline.apply(
+            PubsubIO.readProtos(Primitive.class)
+                .fromSubscription(SUBSCRIPTION.getPath())
+                .withClock(CLOCK)
+                .withClientFactory(clientFactory));
+    PAssert.that(read).containsInAnyOrder(inputs);
+    readPipeline.run();
+  }
+
+  @Test
+  public void testProtoDynamicMessage() {
+    ProtoCoder<Primitive> coder = ProtoCoder.of(Primitive.class);
+    ImmutableList<Primitive> inputs =
+        ImmutableList.of(
+            Primitive.newBuilder().setPrimitiveInt32(42).build(),
+            Primitive.newBuilder().setPrimitiveBool(true).build(),
+            Primitive.newBuilder().setPrimitiveString("Hello, World!").build());
+    setupTestClient(inputs, coder);
+
+    ProtoDomain domain = ProtoDomain.buildFrom(Primitive.getDescriptor());
+    String name = Primitive.getDescriptor().getFullName();
+    PCollection<Primitive> read =
+        readPipeline
+            .apply(
+                PubsubIO.readProtoDynamicMessage(domain, name)
+                    .fromSubscription(SUBSCRIPTION.getPath())
+                    .withClock(CLOCK)
+                    .withClientFactory(clientFactory))
+            // DynamicMessage doesn't work well with PAssert, but if the content can be successfully
+            // converted back into the original Primitive, then that should be good enough to
+            // consider it a successful read.
+            .apply(
+                "Return To Primitive",
+                MapElements.into(TypeDescriptor.of(Primitive.class))
+                    .via(
+                        (DynamicMessage message) -> {

Review comment:
       `MapElements` does have `exceptionsInto` and [`exceptionsVia`](https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/MapElements.html#exceptionsVia-org.apache.beam.sdk.transforms.InferableFunction-) - would that work here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org