You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2023/06/13 21:48:48 UTC
[beam] branch master updated: Merge pull request #26849: add attribute support to writeAvros and writeProtos
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d998ba3475f Merge pull request #26849: add attribute support to writeAvros and writeProtos
d998ba3475f is described below
commit d998ba3475f67f66716d54f784ba46f5efa88cd7
Author: Reuven Lax <re...@google.com>
AuthorDate: Tue Jun 13 14:48:35 2023 -0700
Merge pull request #26849: add attribute support to writeAvros and writeProtos
---
.../beam/sdk/io/gcp/pubsub/ExternalWrite.java | 14 ++++-
.../sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java | 12 ++---
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 63 ++++++++++++++++++----
3 files changed, 73 insertions(+), 16 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
index 4479323d66f..7c63b9023e3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
@@ -26,8 +26,10 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -72,7 +74,7 @@ public final class ExternalWrite implements ExternalTransformRegistrar {
@Override
public PTransform<PCollection<byte[]>, PDone> buildExternal(Configuration config) {
PubsubIO.Write.Builder<byte[]> writeBuilder =
- PubsubIO.Write.newBuilder(new ParsePubsubMessageProtoAsPayload());
+ PubsubIO.Write.newBuilder(new ParsePubsubMessageProtoAsPayloadFromWindowedValue());
if (config.topic != null) {
StaticValueProvider<String> topic = StaticValueProvider.of(config.topic);
writeBuilder.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath));
@@ -87,4 +89,14 @@ public final class ExternalWrite implements ExternalTransformRegistrar {
return writeBuilder.build();
}
}
+
+ public static class ParsePubsubMessageProtoAsPayloadFromWindowedValue
+ implements SerializableFunction<ValueInSingleWindow<byte[]>, PubsubMessage> {
+ static final ParsePubsubMessageProtoAsPayload INNER = new ParsePubsubMessageProtoAsPayload();
+
+ @Override
+ public PubsubMessage apply(ValueInSingleWindow<byte[]> input) {
+ return INNER.apply(input.getValue());
+ }
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
index c082b2007aa..26f4fb5d076 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
@@ -38,7 +38,7 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>
private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 6;
private int maxPublishBatchSize;
- private SerializableFunction<InputT, PubsubMessage> formatFunction;
+ private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction;
@Nullable SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction;
static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchSize)
@@ -110,7 +110,7 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>
}
PreparePubsubWriteDoFn(
- SerializableFunction<InputT, PubsubMessage> formatFunction,
+ SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction,
@Nullable
SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction,
int maxPublishBatchSize) {
@@ -126,11 +126,11 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>
BoundedWindow window,
PaneInfo paneInfo,
OutputReceiver<PubsubMessage> o) {
- PubsubMessage message = formatFunction.apply(element);
+ ValueInSingleWindow<InputT> valueInSingleWindow =
+ ValueInSingleWindow.of(element, ts, window, paneInfo);
+ PubsubMessage message = formatFunction.apply(valueInSingleWindow);
if (topicFunction != null) {
- message =
- message.withTopic(
- topicFunction.apply(ValueInSingleWindow.of(element, ts, window, paneInfo)).asPath());
+ message = message.withTopic(topicFunction.apply(valueInSingleWindow).asPath());
}
try {
validatePubsubMessageSize(message, maxPublishBatchSize);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index fbc3909ce23..e561831dc4c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -731,8 +731,9 @@ public class PubsubIO {
*/
public static Write<String> writeStrings() {
return Write.newBuilder(
- (String string) ->
- new PubsubMessage(string.getBytes(StandardCharsets.UTF_8), ImmutableMap.of()))
+ (ValueInSingleWindow<String> stringAndWindow) ->
+ new PubsubMessage(
+ stringAndWindow.getValue().getBytes(StandardCharsets.UTF_8), ImmutableMap.of()))
.setDynamicDestinations(false)
.build();
}
@@ -748,6 +749,19 @@ public class PubsubIO {
.build();
}
+ /**
+ * Returns A {@link PTransform} that writes binary encoded protobuf messages of a given type to a
+ * Google Cloud Pub/Sub stream.
+ */
+ public static <T extends Message> Write<T> writeProtos(
+ Class<T> messageClass,
+ SerializableFunction<ValueInSingleWindow<T>, Map<String, String>> attributeFn) {
+ // TODO: Like in readProtos(), stop using ProtoCoder and instead format the payload directly.
+ return Write.newBuilder(formatPayloadUsingCoder(ProtoCoder.of(messageClass), attributeFn))
+ .setDynamicDestinations(false)
+ .build();
+ }
+
/**
* Returns A {@link PTransform} that writes binary encoded Avro messages of a given type to a
* Google Cloud Pub/Sub stream.
@@ -759,6 +773,19 @@ public class PubsubIO {
.build();
}
+ /**
+ * Returns A {@link PTransform} that writes binary encoded Avro messages of a given type to a
+ * Google Cloud Pub/Sub stream.
+ */
+ public static <T> Write<T> writeAvros(
+ Class<T> clazz,
+ SerializableFunction<ValueInSingleWindow<T>, Map<String, String>> attributeFn) {
+ // TODO: Like in readAvros(), stop using AvroCoder and instead format the payload directly.
+ return Write.newBuilder(formatPayloadUsingCoder(AvroCoder.of(clazz), attributeFn))
+ .setDynamicDestinations(false)
+ .build();
+ }
+
/** Implementation of read methods. */
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@@ -1163,13 +1190,14 @@ public class PubsubIO {
abstract @Nullable String getIdAttribute();
/** The format function for input PubsubMessage objects. */
- abstract SerializableFunction<T, PubsubMessage> getFormatFn();
+ abstract SerializableFunction<ValueInSingleWindow<T>, PubsubMessage> getFormatFn();
abstract @Nullable String getPubsubRootUrl();
abstract Builder<T> toBuilder();
- static <T> Builder<T> newBuilder(SerializableFunction<T, PubsubMessage> formatFn) {
+ static <T> Builder<T> newBuilder(
+ SerializableFunction<ValueInSingleWindow<T>, PubsubMessage> formatFn) {
Builder<T> builder = new AutoValue_PubsubIO_Write.Builder<T>();
builder.setPubsubClientFactory(FACTORY);
builder.setFormatFn(formatFn);
@@ -1177,7 +1205,7 @@ public class PubsubIO {
}
static Builder<PubsubMessage> newBuilder() {
- return newBuilder(x -> x);
+ return newBuilder(x -> x.getValue());
}
@AutoValue.Builder
@@ -1199,7 +1227,8 @@ public class PubsubIO {
abstract Builder<T> setIdAttribute(String idAttribute);
- abstract Builder<T> setFormatFn(SerializableFunction<T, PubsubMessage> formatFn);
+ abstract Builder<T> setFormatFn(
+ SerializableFunction<ValueInSingleWindow<T>, PubsubMessage> formatFn);
abstract Builder<T> setPubsubRootUrl(String pubsubRootUrl);
@@ -1486,11 +1515,27 @@ public class PubsubIO {
};
}
- private static <T> SerializableFunction<T, PubsubMessage> formatPayloadUsingCoder(
- Coder<T> coder) {
+ private static <T>
+ SerializableFunction<ValueInSingleWindow<T>, PubsubMessage> formatPayloadUsingCoder(
+ Coder<T> coder) {
+ return input -> {
+ try {
+ return new PubsubMessage(
+ CoderUtils.encodeToByteArray(coder, input.getValue()), ImmutableMap.of());
+ } catch (CoderException e) {
+ throw new RuntimeException("Could not encode Pubsub message", e);
+ }
+ };
+ }
+
+ private static <T>
+ SerializableFunction<ValueInSingleWindow<T>, PubsubMessage> formatPayloadUsingCoder(
+ Coder<T> coder,
+ SerializableFunction<ValueInSingleWindow<T>, Map<String, String>> attributesFn) {
return input -> {
try {
- return new PubsubMessage(CoderUtils.encodeToByteArray(coder, input), ImmutableMap.of());
+ return new PubsubMessage(
+ CoderUtils.encodeToByteArray(coder, input.getValue()), attributesFn.apply(input));
} catch (CoderException e) {
throw new RuntimeException("Could not encode Pubsub message", e);
}