You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2023/06/13 21:48:48 UTC

[beam] branch master updated: Merge pull request #26849: add attribute support to writeAvros and writeProtos

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d998ba3475f Merge pull request #26849: add attribute support to writeAvros and writeProtos
d998ba3475f is described below

commit d998ba3475f67f66716d54f784ba46f5efa88cd7
Author: Reuven Lax <re...@google.com>
AuthorDate: Tue Jun 13 14:48:35 2023 -0700

    Merge pull request #26849: add attribute support to writeAvros and writeProtos
---
 .../beam/sdk/io/gcp/pubsub/ExternalWrite.java      | 14 ++++-
 .../sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java  | 12 ++---
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java    | 63 ++++++++++++++++++----
 3 files changed, 73 insertions(+), 16 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
index 4479323d66f..7c63b9023e3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
@@ -26,8 +26,10 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -72,7 +74,7 @@ public final class ExternalWrite implements ExternalTransformRegistrar {
     @Override
     public PTransform<PCollection<byte[]>, PDone> buildExternal(Configuration config) {
       PubsubIO.Write.Builder<byte[]> writeBuilder =
-          PubsubIO.Write.newBuilder(new ParsePubsubMessageProtoAsPayload());
+          PubsubIO.Write.newBuilder(new ParsePubsubMessageProtoAsPayloadFromWindowedValue());
       if (config.topic != null) {
         StaticValueProvider<String> topic = StaticValueProvider.of(config.topic);
         writeBuilder.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath));
@@ -87,4 +89,14 @@ public final class ExternalWrite implements ExternalTransformRegistrar {
       return writeBuilder.build();
     }
   }
+
+  public static class ParsePubsubMessageProtoAsPayloadFromWindowedValue
+      implements SerializableFunction<ValueInSingleWindow<byte[]>, PubsubMessage> {
+    static final ParsePubsubMessageProtoAsPayload INNER = new ParsePubsubMessageProtoAsPayload();
+
+    @Override
+    public PubsubMessage apply(ValueInSingleWindow<byte[]> input) {
+      return INNER.apply(input.getValue());
+    }
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
index c082b2007aa..26f4fb5d076 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
@@ -38,7 +38,7 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>
   private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 6;
   private int maxPublishBatchSize;
 
-  private SerializableFunction<InputT, PubsubMessage> formatFunction;
+  private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction;
   @Nullable SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction;
 
   static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchSize)
@@ -110,7 +110,7 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>
   }
 
   PreparePubsubWriteDoFn(
-      SerializableFunction<InputT, PubsubMessage> formatFunction,
+      SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction,
       @Nullable
           SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction,
       int maxPublishBatchSize) {
@@ -126,11 +126,11 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>
       BoundedWindow window,
       PaneInfo paneInfo,
       OutputReceiver<PubsubMessage> o) {
-    PubsubMessage message = formatFunction.apply(element);
+    ValueInSingleWindow<InputT> valueInSingleWindow =
+        ValueInSingleWindow.of(element, ts, window, paneInfo);
+    PubsubMessage message = formatFunction.apply(valueInSingleWindow);
     if (topicFunction != null) {
-      message =
-          message.withTopic(
-              topicFunction.apply(ValueInSingleWindow.of(element, ts, window, paneInfo)).asPath());
+      message = message.withTopic(topicFunction.apply(valueInSingleWindow).asPath());
     }
     try {
       validatePubsubMessageSize(message, maxPublishBatchSize);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index fbc3909ce23..e561831dc4c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -731,8 +731,9 @@ public class PubsubIO {
    */
   public static Write<String> writeStrings() {
     return Write.newBuilder(
-            (String string) ->
-                new PubsubMessage(string.getBytes(StandardCharsets.UTF_8), ImmutableMap.of()))
+            (ValueInSingleWindow<String> stringAndWindow) ->
+                new PubsubMessage(
+                    stringAndWindow.getValue().getBytes(StandardCharsets.UTF_8), ImmutableMap.of()))
         .setDynamicDestinations(false)
         .build();
   }
@@ -748,6 +749,19 @@ public class PubsubIO {
         .build();
   }
 
+  /**
+   * Returns A {@link PTransform} that writes binary encoded protobuf messages of a given type to a
+   * Google Cloud Pub/Sub stream.
+   */
+  public static <T extends Message> Write<T> writeProtos(
+      Class<T> messageClass,
+      SerializableFunction<ValueInSingleWindow<T>, Map<String, String>> attributeFn) {
+    // TODO: Like in readProtos(), stop using ProtoCoder and instead format the payload directly.
+    return Write.newBuilder(formatPayloadUsingCoder(ProtoCoder.of(messageClass), attributeFn))
+        .setDynamicDestinations(false)
+        .build();
+  }
+
   /**
    * Returns A {@link PTransform} that writes binary encoded Avro messages of a given type to a
    * Google Cloud Pub/Sub stream.
@@ -759,6 +773,19 @@ public class PubsubIO {
         .build();
   }
 
+  /**
+   * Returns A {@link PTransform} that writes binary encoded Avro messages of a given type to a
+   * Google Cloud Pub/Sub stream.
+   */
+  public static <T> Write<T> writeAvros(
+      Class<T> clazz,
+      SerializableFunction<ValueInSingleWindow<T>, Map<String, String>> attributeFn) {
+    // TODO: Like in readAvros(), stop using AvroCoder and instead format the payload directly.
+    return Write.newBuilder(formatPayloadUsingCoder(AvroCoder.of(clazz), attributeFn))
+        .setDynamicDestinations(false)
+        .build();
+  }
+
   /** Implementation of read methods. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@@ -1163,13 +1190,14 @@ public class PubsubIO {
     abstract @Nullable String getIdAttribute();
 
     /** The format function for input PubsubMessage objects. */
-    abstract SerializableFunction<T, PubsubMessage> getFormatFn();
+    abstract SerializableFunction<ValueInSingleWindow<T>, PubsubMessage> getFormatFn();
 
     abstract @Nullable String getPubsubRootUrl();
 
     abstract Builder<T> toBuilder();
 
-    static <T> Builder<T> newBuilder(SerializableFunction<T, PubsubMessage> formatFn) {
+    static <T> Builder<T> newBuilder(
+        SerializableFunction<ValueInSingleWindow<T>, PubsubMessage> formatFn) {
       Builder<T> builder = new AutoValue_PubsubIO_Write.Builder<T>();
       builder.setPubsubClientFactory(FACTORY);
       builder.setFormatFn(formatFn);
@@ -1177,7 +1205,7 @@ public class PubsubIO {
     }
 
     static Builder<PubsubMessage> newBuilder() {
-      return newBuilder(x -> x);
+      return newBuilder(x -> x.getValue());
     }
 
     @AutoValue.Builder
@@ -1199,7 +1227,8 @@ public class PubsubIO {
 
       abstract Builder<T> setIdAttribute(String idAttribute);
 
-      abstract Builder<T> setFormatFn(SerializableFunction<T, PubsubMessage> formatFn);
+      abstract Builder<T> setFormatFn(
+          SerializableFunction<ValueInSingleWindow<T>, PubsubMessage> formatFn);
 
       abstract Builder<T> setPubsubRootUrl(String pubsubRootUrl);
 
@@ -1486,11 +1515,27 @@ public class PubsubIO {
     };
   }
 
-  private static <T> SerializableFunction<T, PubsubMessage> formatPayloadUsingCoder(
-      Coder<T> coder) {
+  private static <T>
+      SerializableFunction<ValueInSingleWindow<T>, PubsubMessage> formatPayloadUsingCoder(
+          Coder<T> coder) {
+    return input -> {
+      try {
+        return new PubsubMessage(
+            CoderUtils.encodeToByteArray(coder, input.getValue()), ImmutableMap.of());
+      } catch (CoderException e) {
+        throw new RuntimeException("Could not encode Pubsub message", e);
+      }
+    };
+  }
+
+  private static <T>
+      SerializableFunction<ValueInSingleWindow<T>, PubsubMessage> formatPayloadUsingCoder(
+          Coder<T> coder,
+          SerializableFunction<ValueInSingleWindow<T>, Map<String, String>> attributesFn) {
     return input -> {
       try {
-        return new PubsubMessage(CoderUtils.encodeToByteArray(coder, input), ImmutableMap.of());
+        return new PubsubMessage(
+            CoderUtils.encodeToByteArray(coder, input.getValue()), attributesFn.apply(input));
       } catch (CoderException e) {
         throw new RuntimeException("Could not encode Pubsub message", e);
       }