You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/04/08 23:03:59 UTC
[beam] branch master updated: Change PubSubSource and PubSubSink
translation to avoid special transform overrides.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c472530 Change PubSubSource and PubSubSink translation to avoid special transform overrides.
new a48abeb Merge pull request #14384 from [BEAM-10861] Change PubSubSource and PubSubSink translation to avoid special transform overrides
c472530 is described below
commit c4725301da8f1fbc3982bca986f4d9a1b9a4ce19
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Mar 30 18:00:59 2021 -0700
Change PubSubSource and PubSubSink translation to avoid special transform overrides.
---
.../pipeline/src/main/proto/beam_runner_api.proto | 4 +
.../beam/runners/dataflow/DataflowRunner.java | 180 +-------------------
sdks/java/io/google-cloud-platform/build.gradle | 3 +-
.../io/gcp/pubsub/PubSubPayloadTranslation.java | 159 +++++++++++++++++
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 78 ++++++---
.../sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 73 +++++---
.../sdk/io/gcp/pubsub/RunnerImplementedSink.java | 68 --------
.../pubsub/RunnerImplementedSinkTranslation.java | 87 ----------
.../sdk/io/gcp/pubsub/RunnerImplementedSource.java | 83 ---------
.../pubsub/RunnerImplementedSourceTranslation.java | 102 -----------
....java => PubSubReadPayloadTranslationTest.java} | 189 +++++++++++----------
...java => PubSubWritePayloadTranslationTest.java} | 37 ++--
.../sdk/io/gcp/pubsub/PubsubIOExternalTest.java | 12 +-
.../io/gcp/pubsub/PubsubUnboundedSourceTest.java | 80 +++++++--
14 files changed, 450 insertions(+), 705 deletions(-)
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 134fcb6..138e352 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -696,6 +696,8 @@ message WriteFilesPayload {
// with a native implementation.
// The SDK should guarantee that only one of topic, subscription,
// topic_runtime_overridden and subscription_runtime_overridden is set.
+// The output of PubSubReadPayload should be bytes of serialized PubsubMessage
+// proto if with_attributes == true. Otherwise, the bytes is the raw payload.
message PubSubReadPayload {
// Topic to read from. Exactly one of topic or subscription should be set.
@@ -727,6 +729,8 @@ message PubSubReadPayload {
// with a native implementation.
// The SDK should guarantee that only one of topic and topic_runtime_overridden
// is set.
+// The output of PubSubWritePayload should be bytes if serialized PubsubMessage
+// proto.
message PubSubWritePayload {
// Topic to write to.
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a06951f..cdb7e67 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -95,7 +95,6 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.KvCoder;
@@ -113,12 +112,8 @@ import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
-import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSink;
-import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -137,11 +132,9 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.Impulse;
-import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
@@ -165,7 +158,6 @@ import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -207,7 +199,6 @@ import org.slf4j.LoggerFactory;
public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
private static final Logger LOG = LoggerFactory.getLogger(DataflowRunner.class);
-
/** Provided configuration options. */
private final DataflowPipelineOptions options;
@@ -455,27 +446,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
}
- /** For portable jobs, Dataflow still requires an override of the PubsubIO transforms. */
- private List<PTransformOverride> getPortableOverrides() {
- ImmutableList.Builder<PTransformOverride> overridesBuilder = ImmutableList.builder();
-
- if (!hasExperiment(options, "enable_custom_pubsub_source")) {
- overridesBuilder.add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
- new DataflowReadFromPubsubSourceForRunnerV2OverrideFactory()));
- }
-
- if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
- overridesBuilder.add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
- new DataflowWriteToPubsubRunnerV2OverrideFactory()));
- }
-
- return overridesBuilder.build();
- }
-
private List<PTransformOverride> getOverrides(boolean streaming) {
boolean fnApiEnabled = useUnifiedWorker(options);
ImmutableList.Builder<PTransformOverride> overridesBuilder = ImmutableList.builder();
@@ -506,12 +476,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
if (streaming) {
if (!hasExperiment(options, "enable_custom_pubsub_source")) {
- if (useUnifiedWorker(options)) {
- overridesBuilder.add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
- new DataflowReadFromPubsubSourceForRunnerV2OverrideFactory()));
- } else {
+ if (!useUnifiedWorker(options)) {
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
@@ -519,12 +484,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
- if (useUnifiedWorker(options)) {
- overridesBuilder.add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
- new DataflowWriteToPubsubRunnerV2OverrideFactory()));
- } else {
+ if (!useUnifiedWorker(options)) {
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
@@ -989,9 +949,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
.addAllCapabilities(Environments.getJavaCapabilities())
.build());
- if (useUnifiedWorker(options)) {
- pipeline.replaceAll(getPortableOverrides());
- }
RunnerApi.Pipeline portablePipelineProto =
PipelineTranslation.toProto(pipeline, portableComponents, false);
LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto));
@@ -1495,25 +1452,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
- private static class DataflowReadFromPubsubSourceForRunnerV2OverrideFactory
- implements PTransformOverrideFactory<
- PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> {
-
- @Override
- public PTransformReplacement<PBegin, PCollection<PubsubMessage>> getReplacementTransform(
- AppliedPTransform<PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> transform) {
- return PTransformReplacement.of(
- transform.getPipeline().begin(),
- new DataflowReadFromPubsubForRunnerV2(transform.getTransform()));
- }
-
- @Override
- public Map<PCollection<?>, ReplacementOutput> mapOutputs(
- Map<TupleTag<?>, PCollection<?>> outputs, PCollection<PubsubMessage> newOutput) {
- return ReplacementOutputs.singleton(outputs, newOutput);
- }
- }
-
/**
* Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we can
* instead defer to Windmill's implementation.
@@ -1552,38 +1490,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
- public static class DataflowReadFromPubsubForRunnerV2
- extends PTransform<PBegin, PCollection<PubsubMessage>> {
-
- private final PubsubUnboundedSource transform;
-
- public DataflowReadFromPubsubForRunnerV2(PubsubUnboundedSource transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollection<PubsubMessage> expand(PBegin input) {
- PCollection<byte[]> outputFromRunner =
- input.apply(
- "DataflowReadFromPubsubRunnerV2", new RunnerImplementedSource(this.transform));
-
- SerializableFunction<byte[], PubsubMessage> function;
- if (transform.getNeedsAttributes() || transform.getNeedsMessageId()) {
- function = new PubsubMessages.ParsePubsubMessageProtoAsPayload();
- } else {
- function = new DeserializeBytesIntoPubsubMessagePayloadOnly();
- }
- return outputFromRunner.apply(
- "MapBytesToPubsubMessages",
- MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(function));
- }
-
- static {
- DataflowPipelineTranslator.registerTransformTranslator(
- RunnerImplementedSource.class, new DataflowPubsubSourceRunnerV2Translator());
- }
- }
-
private static void translateOverriddenPubsubSourceStep(
PubsubUnboundedSource overriddenTransform, StepTranslationContext stepTranslationContext) {
stepTranslationContext.addInput(PropertyNames.FORMAT, "pubsub");
@@ -1642,20 +1548,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
- private static class DataflowPubsubSourceRunnerV2Translator
- implements TransformTranslator<RunnerImplementedSource> {
-
- @Override
- public void translate(RunnerImplementedSource transform, TranslationContext context) {
- checkArgument(
- context.getPipelineOptions().isStreaming(),
- "DataflowPubsubSourceRunnerV2Translator is only for streaming pipelines.");
- StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
- translateOverriddenPubsubSourceStep(transform.getOverriddenSource(), stepContext);
- stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- }
- }
-
private static class IdentityMessageFn extends SimpleFunction<PubsubMessage, PubsubMessage> {
@Override
@@ -1715,20 +1607,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
- /** Rewrite {@link RunnerImplementedSink} to the appropriate internal node. */
- static class DataflowRunnerV2PubsubSinkTranslator
- implements TransformTranslator<RunnerImplementedSink> {
- @Override
- public void translate(RunnerImplementedSink transform, TranslationContext context) {
- checkArgument(
- context.getPipelineOptions().isStreaming(),
- "DataflowRunnerV2PubsubSink is only for streaming pipelines.");
- StepTranslationContext stepContext = context.addStep(transform, "ParallelWrite");
- StreamingPubsubSinkTranslators.translate(
- transform.getOverrideSink(), stepContext, context.getInput(transform));
- }
- }
-
private static void translate(
PubsubUnboundedSink overriddenTransform,
StepTranslationContext stepContext,
@@ -2206,60 +2084,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
- /**
- * A replacement {@link PTransform} for {@link PubsubUnboundedSink} when using dataflow runner v2.
- */
- private static class DataflowWriteToPubsubForRunnerV2
- extends PTransform<PCollection<PubsubMessage>, PDone> {
-
- private final PubsubUnboundedSink transform;
-
- public DataflowWriteToPubsubForRunnerV2(PubsubUnboundedSink transform) {
- this.transform = transform;
- }
-
- @Override
- public PDone expand(PCollection<PubsubMessage> input) {
- input
- .apply(
- "Output Serialized PubsubMessage Proto",
- MapElements.into(new TypeDescriptor<byte[]>() {})
- .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto()))
- .setCoder(ByteArrayCoder.of())
- .apply("Write to Runner Implemented Sink", new RunnerImplementedSink(transform));
-
- return PDone.in(input.getPipeline());
- }
-
- static {
- DataflowPipelineTranslator.registerTransformTranslator(
- RunnerImplementedSink.class,
- new StreamingPubsubSinkTranslators.DataflowRunnerV2PubsubSinkTranslator());
- }
- }
-
- /**
- * A {@link PTransformOverrideFactory} to provide replacement {@link PTransform} for {@link
- * PubsubUnboundedSink} when using dataflow runner v2.
- */
- private static class DataflowWriteToPubsubRunnerV2OverrideFactory
- implements PTransformOverrideFactory<PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> {
-
- @Override
- public PTransformReplacement<PCollection<PubsubMessage>, PDone> getReplacementTransform(
- AppliedPTransform<PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> transform) {
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- new DataflowWriteToPubsubForRunnerV2(transform.getTransform()));
- }
-
- @Override
- public Map<PCollection<?>, ReplacementOutput> mapOutputs(
- Map<TupleTag<?>, PCollection<?>> outputs, PDone newOutput) {
- return Collections.emptyMap();
- }
- }
-
@VisibleForTesting
static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
implements PTransformOverrideFactory<
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 4b25f56..579a352 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -23,8 +23,7 @@ applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.gcp',
enableSpotbugs: false,
classesTriggerCheckerBugs: [
- 'RunnerImplementedSourceTranslation': 'https://github.com/typetools/checker-framework/issues/3791',
- 'RunnerImplementedSinkTranslation': 'https://github.com/typetools/checker-framework/issues/3791',
+ 'PubSubPayloadTranslation': 'https://github.com/typetools/checker-framework/issues/3791',
],
)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java
new file mode 100644
index 0000000..a1a2675
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubReadPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubWritePayload;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubSource;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+@SuppressWarnings({
+ "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/**
+ * Utility methods for translating a {@link Unbounded} which reads from {@link
+ * PubsubUnboundedSource} to {@link RunnerApi} representations.
+ */
+public class PubSubPayloadTranslation {
+ static class PubSubReadPayloadTranslator
+ implements TransformPayloadTranslator<Read.Unbounded<?>> {
+
+ @Override
+ public String getUrn(Read.Unbounded<?> transform) {
+ if (!(transform.getSource() instanceof PubsubUnboundedSource.PubsubSource)) {
+ return null;
+ }
+ return PTransformTranslation.PUBSUB_READ;
+ }
+
+ @Override
+ public RunnerApi.FunctionSpec translate(
+ AppliedPTransform<?, ?, Unbounded<?>> transform, SdkComponents components) {
+ if (!(transform.getTransform().getSource() instanceof PubsubUnboundedSource.PubsubSource)) {
+ return null;
+ }
+ PubSubReadPayload.Builder payloadBuilder = PubSubReadPayload.newBuilder();
+ PubsubUnboundedSource pubsubUnboundedSource =
+ ((PubsubSource) transform.getTransform().getSource()).outer;
+ ValueProvider<TopicPath> topicProvider = pubsubUnboundedSource.getTopicProvider();
+ if (topicProvider != null) {
+ if (topicProvider.isAccessible()) {
+ payloadBuilder.setTopic(topicProvider.get().getFullPath());
+ } else {
+ payloadBuilder.setTopicRuntimeOverridden(
+ ((NestedValueProvider) topicProvider).propertyName());
+ }
+ }
+ ValueProvider<SubscriptionPath> subscriptionProvider =
+ pubsubUnboundedSource.getSubscriptionProvider();
+ if (subscriptionProvider != null) {
+ if (subscriptionProvider.isAccessible()) {
+ payloadBuilder.setSubscription(subscriptionProvider.get().getFullPath());
+ } else {
+ payloadBuilder.setSubscriptionRuntimeOverridden(
+ ((NestedValueProvider) subscriptionProvider).propertyName());
+ }
+ }
+
+ if (pubsubUnboundedSource.getTimestampAttribute() != null) {
+ payloadBuilder.setTimestampAttribute(pubsubUnboundedSource.getTimestampAttribute());
+ }
+ if (pubsubUnboundedSource.getIdAttribute() != null) {
+ payloadBuilder.setIdAttribute(pubsubUnboundedSource.getIdAttribute());
+ }
+ payloadBuilder.setWithAttributes(
+ pubsubUnboundedSource.getNeedsAttributes() || pubsubUnboundedSource.getNeedsMessageId());
+ return FunctionSpec.newBuilder()
+ .setUrn(getUrn(transform.getTransform()))
+ .setPayload(payloadBuilder.build().toByteString())
+ .build();
+ }
+ }
+
+ static class PubSubWritePayloadTranslator
+ implements TransformPayloadTranslator<PubsubUnboundedSink.PubsubSink> {
+
+ @Override
+ public String getUrn(PubsubUnboundedSink.PubsubSink transform) {
+ return PTransformTranslation.PUBSUB_WRITE;
+ }
+
+ @Override
+ public RunnerApi.FunctionSpec translate(
+ AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubSink> transform,
+ SdkComponents components) {
+ PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder();
+ ValueProvider<TopicPath> topicProvider = transform.getTransform().outer.getTopicProvider();
+ if (topicProvider.isAccessible()) {
+ payloadBuilder.setTopic(topicProvider.get().getFullPath());
+ } else {
+ payloadBuilder.setTopicRuntimeOverridden(
+ ((NestedValueProvider) topicProvider).propertyName());
+ }
+ if (transform.getTransform().outer.getTimestampAttribute() != null) {
+ payloadBuilder.setTimestampAttribute(
+ transform.getTransform().outer.getTimestampAttribute());
+ }
+ if (transform.getTransform().outer.getIdAttribute() != null) {
+ payloadBuilder.setIdAttribute(transform.getTransform().outer.getIdAttribute());
+ }
+ return FunctionSpec.newBuilder()
+ .setUrn(getUrn(transform.getTransform()))
+ .setPayload(payloadBuilder.build().toByteString())
+ .build();
+ }
+ }
+
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar {
+
+ @Override
+ public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return Collections.singletonMap(
+ PubsubUnboundedSink.PubsubSink.class, new PubSubWritePayloadTranslator());
+ }
+ }
+
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar {
+
+ @Override
+ public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return Collections.singletonMap(Read.Unbounded.class, new PubSubReadPayloadTranslator());
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 5c9fca3..e6ce089 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
@@ -46,6 +47,7 @@ import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -59,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -141,7 +144,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
// ================================================================================
/** Convert elements to messages and shard them. */
- private static class ShardFn extends DoFn<PubsubMessage, KV<Integer, OutgoingMessage>> {
+ private static class ShardFn extends DoFn<byte[], KV<Integer, OutgoingMessage>> {
private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements");
private final int numShards;
private final RecordIdMethod recordIdMethod;
@@ -154,8 +157,9 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
elementCounter.inc();
- PubsubMessage message = c.element();
- byte[] elementBytes = message.getPayload();
+ com.google.pubsub.v1.PubsubMessage message =
+ com.google.pubsub.v1.PubsubMessage.parseFrom(c.element());
+ byte[] elementBytes = message.getData().toByteArray();
long timestampMsSinceEpoch = c.timestamp().getMillis();
@Nullable String recordId = null;
@@ -409,29 +413,51 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
@Override
public PDone expand(PCollection<PubsubMessage> input) {
- input
- .apply(
- "PubsubUnboundedSink.Window",
- Window.<PubsubMessage>into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterFirst.of(
- AfterPane.elementCountAtLeast(publishBatchSize),
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(maxLatency))))
- .discardingFiredPanes())
- .apply("PubsubUnboundedSink.Shard", ParDo.of(new ShardFn(numShards, recordIdMethod)))
- .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
- .apply(GroupByKey.create())
+ return input
.apply(
- "PubsubUnboundedSink.Writer",
- ParDo.of(
- new WriterFn(
- pubsubFactory,
- topic,
- timestampAttribute,
- idAttribute,
- publishBatchSize,
- publishBatchBytes)));
- return PDone.in(input.getPipeline());
+ "Output Serialized PubsubMessage Proto",
+ MapElements.into(new TypeDescriptor<byte[]>() {})
+ .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto()))
+ .setCoder(ByteArrayCoder.of())
+ .apply(new PubsubSink(this));
+ }
+
+ static class PubsubSink extends PTransform<PCollection<byte[]>, PDone> {
+ public final PubsubUnboundedSink outer;
+
+ PubsubSink(PubsubUnboundedSink outer) {
+ this.outer = outer;
+ }
+
+ @Override
+ public PDone expand(PCollection<byte[]> input) {
+ input
+ .apply(
+ "PubsubUnboundedSink.Window",
+ Window.<byte[]>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterFirst.of(
+ AfterPane.elementCountAtLeast(outer.publishBatchSize),
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(outer.maxLatency))))
+ .discardingFiredPanes())
+ .apply(
+ "PubsubUnboundedSink.Shard",
+ ParDo.of(new ShardFn(outer.numShards, outer.recordIdMethod)))
+ .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
+ .apply(GroupByKey.create())
+ .apply(
+ "PubsubUnboundedSink.Writer",
+ ParDo.of(
+ new WriterFn(
+ outer.pubsubFactory,
+ outer.topic,
+ outer.timestampAttribute,
+ outer.idAttribute,
+ outer.publishBatchSize,
+ outer.publishBatchBytes)));
+ return PDone.in(input.getPipeline());
+ }
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 37d0e94..bd5e868 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
@@ -53,15 +54,19 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.SourceMetrics;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -70,6 +75,7 @@ import org.apache.beam.sdk.util.BucketingFunction;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -358,7 +364,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
* consumed downstream and/or ACKed back to Pubsub.
*/
@VisibleForTesting
- static class PubsubReader extends UnboundedSource.UnboundedReader<PubsubMessage> {
+ static class PubsubReader extends UnboundedSource.UnboundedReader<byte[]> {
/** For access to topic and checkpointCoder. */
private final PubsubSource outer;
@@ -882,14 +888,14 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
}
@Override
- public PubsubMessage getCurrent() throws NoSuchElementException {
+ public byte[] getCurrent() throws NoSuchElementException {
if (current == null) {
throw new NoSuchElementException();
}
- return new PubsubMessage(
- current.message().getData().toByteArray(),
- current.message().getAttributesMap(),
- current.recordId());
+ if (this.outer.outer.getNeedsMessageId() || this.outer.outer.getNeedsAttributes()) {
+ return current.message().toByteArray();
+ }
+ return current.message().getData().toByteArray();
}
@Override
@@ -1010,7 +1016,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
// ================================================================================
@VisibleForTesting
- static class PubsubSource extends UnboundedSource<PubsubMessage, PubsubCheckpoint> {
+ static class PubsubSource extends UnboundedSource<byte[], PubsubCheckpoint> {
public final PubsubUnboundedSource outer;
// The subscription to read from.
@VisibleForTesting final ValueProvider<SubscriptionPath> subscriptionPath;
@@ -1086,16 +1092,8 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
}
@Override
- public Coder<PubsubMessage> getOutputCoder() {
- if (outer.getNeedsMessageId()) {
- return outer.getNeedsAttributes()
- ? PubsubMessageWithAttributesAndMessageIdCoder.of()
- : PubsubMessageWithMessageIdCoder.of();
- } else {
- return outer.getNeedsAttributes()
- ? PubsubMessageWithAttributesCoder.of()
- : PubsubMessagePayloadOnlyCoder.of();
- }
+ public Coder<byte[]> getOutputCoder() {
+ return ByteArrayCoder.of();
}
@Override
@@ -1336,14 +1334,39 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
@Override
public PCollection<PubsubMessage> expand(PBegin input) {
- return input
- .getPipeline()
- .begin()
- .apply(Read.from(new PubsubSource(this)))
- .apply(
- "PubsubUnboundedSource.Stats",
- ParDo.of(
- new StatsFn(pubsubFactory, subscription, topic, timestampAttribute, idAttribute)));
+ SerializableFunction<byte[], PubsubMessage> function;
+ if (getNeedsAttributes() || getNeedsMessageId()) {
+ function = new PubsubMessages.ParsePubsubMessageProtoAsPayload();
+ } else {
+ function = new DeserializeBytesIntoPubsubMessagePayloadOnly();
+ }
+ PCollection<PubsubMessage> messages =
+ input
+ .getPipeline()
+ .begin()
+ .apply(Read.from(new PubsubSource(this)))
+ .apply(
+ "MapBytesToPubsubMessages",
+ MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(function));
+ if (usesStatsFn(input.getPipeline().getOptions())) {
+ messages =
+ messages.apply(
+ "PubsubUnboundedSource.Stats",
+ ParDo.of(
+ new StatsFn(
+ pubsubFactory, subscription, topic, timestampAttribute, idAttribute)));
+ }
+ return messages;
+ }
+
+ private boolean usesStatsFn(PipelineOptions options) {
+ if (ExperimentalOptions.hasExperiment(options, "enable_custom_pubsub_source")) {
+ return true;
+ }
+ if (!options.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.")) {
+ return true;
+ }
+ return false;
}
private SubscriptionPath createRandomSubscription(PipelineOptions options) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSink.java
deleted file mode 100644
index 6c70b50..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSink.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-/**
- * A {@link PTransform} which represents a runner implemented Pubsub sink. {@link
- * RunnerImplementedSinkTranslator} will translate this transform into well-known composite.
- */
-@Internal
-public class RunnerImplementedSink extends PTransform<PCollection<byte[]>, PDone> {
- private final PubsubUnboundedSink sink;
-
- public RunnerImplementedSink(PubsubUnboundedSink sink) {
- this.sink = sink;
- }
-
- public PubsubUnboundedSink getOverrideSink() {
- return sink;
- }
-
- public ValueProvider<TopicPath> getTopicProvider() {
- return sink.getTopicProvider();
- }
-
- public String getTimestampAttribute() {
- return sink.getTimestampAttribute();
- }
-
- public String getIdAttribute() {
- return sink.getIdAttribute();
- }
-
- @Override
- public PDone expand(PCollection<byte[]> input) {
- return PDone.in(input.getPipeline());
- }
-
- @Override
- protected String getKindString() {
- return "RunnerImplementedSink";
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslation.java
deleted file mode 100644
index 056871f..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslation.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import com.google.auto.service.AutoService;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubWritePayload;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
-import org.apache.beam.runners.core.construction.SdkComponents;
-import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-/**
- * Utility methods for translating a {@link RunnerImplementedSink} to {@link RunnerApi}
- * representations.
- */
-public class RunnerImplementedSinkTranslation {
- static class RunnerImplementedSinkTranslator
- implements TransformPayloadTranslator<RunnerImplementedSink> {
-
- @Override
- public String getUrn(RunnerImplementedSink transform) {
- return PTransformTranslation.PUBSUB_WRITE;
- }
-
- @Override
- public RunnerApi.FunctionSpec translate(
- AppliedPTransform<?, ?, RunnerImplementedSink> transform, SdkComponents components) {
- PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder();
- ValueProvider<TopicPath> topicProvider = transform.getTransform().getTopicProvider();
- if (topicProvider.isAccessible()) {
- payloadBuilder.setTopic(topicProvider.get().getFullPath());
- } else {
- payloadBuilder.setTopicRuntimeOverridden(
- ((NestedValueProvider) topicProvider).propertyName());
- }
- if (transform.getTransform().getTimestampAttribute() != null) {
- payloadBuilder.setTimestampAttribute(transform.getTransform().getTimestampAttribute());
- }
- if (transform.getTransform().getIdAttribute() != null) {
- payloadBuilder.setIdAttribute(transform.getTransform().getIdAttribute());
- }
- return FunctionSpec.newBuilder()
- .setUrn(getUrn(transform.getTransform()))
- .setPayload(payloadBuilder.build().toByteString())
- .build();
- }
- }
-
- @AutoService(TransformPayloadTranslatorRegistrar.class)
- public static class Registrar implements TransformPayloadTranslatorRegistrar {
-
- @Override
- public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
- getTransformPayloadTranslators() {
- return Collections.singletonMap(
- RunnerImplementedSink.class, new RunnerImplementedSinkTranslator());
- }
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSource.java
deleted file mode 100644
index b1170fc..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSource.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-/**
- * A {@link PTransform} which represents a runner implemented Pubsub source. {@link
- * RunnerImplementedSourceTranslator} will translate this transform into well-known composite.
- */
-@Internal
-public class RunnerImplementedSource extends PTransform<PBegin, PCollection<byte[]>> {
- private final PubsubUnboundedSource source;
-
- public RunnerImplementedSource(PubsubUnboundedSource source) {
- this.source = source;
- }
-
- public PubsubUnboundedSource getOverriddenSource() {
- return source;
- }
-
- public ValueProvider<TopicPath> getTopicProvider() {
- return source.getTopicProvider();
- }
-
- public ValueProvider<SubscriptionPath> getSubscriptionProvider() {
- return source.getSubscriptionProvider();
- }
-
- public String getTimestampAttribute() {
- return source.getTimestampAttribute();
- }
-
- public String getIdAttribute() {
- return source.getIdAttribute();
- }
-
- public boolean isWithAttributes() {
- return source.getNeedsAttributes() || source.getNeedsMessageId();
- }
-
- @Override
- public PCollection<byte[]> expand(PBegin input) {
- ByteArrayCoder coder = ByteArrayCoder.of();
- return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, coder)
- .setCoder(coder);
- }
-
- @Override
- protected String getKindString() {
- return "RunnerImplementedSource";
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslation.java
deleted file mode 100644
index 5f002d0..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslation.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import com.google.auto.service.AutoService;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubReadPayload;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
-import org.apache.beam.runners.core.construction.SdkComponents;
-import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-/**
- * Utility methods for translating a {@link RunnerImplementedSource} to {@link RunnerApi}
- * representations.
- */
-public class RunnerImplementedSourceTranslation {
- static class RunnerImplementedSourceTranslator
- implements TransformPayloadTranslator<RunnerImplementedSource> {
-
- @Override
- public String getUrn(RunnerImplementedSource transform) {
- return PTransformTranslation.PUBSUB_READ;
- }
-
- @Override
- public RunnerApi.FunctionSpec translate(
- AppliedPTransform<?, ?, RunnerImplementedSource> transform, SdkComponents components) {
- PubSubReadPayload.Builder payloadBuilder = PubSubReadPayload.newBuilder();
- ValueProvider<TopicPath> topicProvider = transform.getTransform().getTopicProvider();
- if (topicProvider != null) {
- if (topicProvider.isAccessible()) {
- payloadBuilder.setTopic(topicProvider.get().getFullPath());
- } else {
- payloadBuilder.setTopicRuntimeOverridden(
- ((NestedValueProvider) topicProvider).propertyName());
- }
- }
- ValueProvider<SubscriptionPath> subscriptionProvider =
- transform.getTransform().getSubscriptionProvider();
- if (subscriptionProvider != null) {
- if (subscriptionProvider.isAccessible()) {
- payloadBuilder.setSubscription(subscriptionProvider.get().getFullPath());
- } else {
- payloadBuilder.setSubscriptionRuntimeOverridden(
- ((NestedValueProvider) subscriptionProvider).propertyName());
- }
- }
-
- if (transform.getTransform().getTimestampAttribute() != null) {
- payloadBuilder.setTimestampAttribute(transform.getTransform().getTimestampAttribute());
- }
- if (transform.getTransform().getIdAttribute() != null) {
- payloadBuilder.setIdAttribute(transform.getTransform().getIdAttribute());
- }
- payloadBuilder.setWithAttributes(transform.getTransform().isWithAttributes());
- return FunctionSpec.newBuilder()
- .setUrn(getUrn(transform.getTransform()))
- .setPayload(payloadBuilder.build().toByteString())
- .build();
- }
- }
-
- @AutoService(TransformPayloadTranslatorRegistrar.class)
- public static class Registrar implements TransformPayloadTranslatorRegistrar {
-
- @Override
- public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
- getTransformPayloadTranslators() {
- return Collections.singletonMap(
- RunnerImplementedSource.class, new RunnerImplementedSourceTranslator());
- }
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubReadPayloadTranslationTest.java
similarity index 53%
rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslationTest.java
rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubReadPayloadTranslationTest.java
index d1cea3a..9e43b59 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslationTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubReadPayloadTranslationTest.java
@@ -25,9 +25,9 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubReadPayload;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSourceTranslation.RunnerImplementedSourceTranslator;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -43,15 +43,15 @@ import org.junit.runners.Parameterized.Parameters;
/** Test RunnerImplementedSourceTranslator. */
@RunWith(Parameterized.class)
-public class RunnerImplementedSourceTranslationTest {
+public class PubSubReadPayloadTranslationTest {
private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
private static final String ID_ATTRIBUTE = "id";
private static final String PROJECT = "project";
private static final TopicPath TOPIC = PubsubClient.topicPathFromName(PROJECT, "testTopic");
private static final SubscriptionPath SUBSCRIPTION =
PubsubClient.subscriptionPathFromName(PROJECT, "testSubscription");
- private final RunnerImplementedSourceTranslator sourceTranslator =
- new RunnerImplementedSourceTranslator();
+ private final PubSubPayloadTranslation.PubSubReadPayloadTranslator sourceTranslator =
+ new PubSubPayloadTranslation.PubSubReadPayloadTranslator();
public static TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
private static final ValueProvider<TopicPath> TOPIC_PROVIDER = pipeline.newProvider(TOPIC);
@@ -64,16 +64,17 @@ public class RunnerImplementedSourceTranslationTest {
new Object[][] {
{
// Read payload only from TOPIC.
- new RunnerImplementedSource(
- new PubsubUnboundedSource(
- PubsubTestClient.createFactoryForCreateSubscription(),
- StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
- StaticValueProvider.of(TOPIC),
- null /* subscription */,
- null /* timestampLabel */,
- null /* idLabel */,
- false /* needsAttributes */,
- false /* needsMessageId*/)),
+ Read.from(
+ new PubsubUnboundedSource.PubsubSource(
+ new PubsubUnboundedSource(
+ PubsubTestClient.createFactoryForCreateSubscription(),
+ StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+ StaticValueProvider.of(TOPIC),
+ null /* subscription */,
+ null /* timestampLabel */,
+ null /* idLabel */,
+ false /* needsAttributes */,
+ false /* needsMessageId*/))),
PubSubReadPayload.newBuilder()
.setTopic(TOPIC.getFullPath())
.setWithAttributes(false)
@@ -81,16 +82,17 @@ public class RunnerImplementedSourceTranslationTest {
},
{
// Read with attributes and message id from TOPIC.
- new RunnerImplementedSource(
- new PubsubUnboundedSource(
- PubsubTestClient.createFactoryForCreateSubscription(),
- StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
- StaticValueProvider.of(TOPIC),
- null /* subscription */,
- TIMESTAMP_ATTRIBUTE /* timestampLabel */,
- ID_ATTRIBUTE /* idLabel */,
- true /* needsAttributes */,
- true /* needsMessageId */)),
+ Read.from(
+ new PubsubUnboundedSource.PubsubSource(
+ new PubsubUnboundedSource(
+ PubsubTestClient.createFactoryForCreateSubscription(),
+ StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+ StaticValueProvider.of(TOPIC),
+ null /* subscription */,
+ TIMESTAMP_ATTRIBUTE /* timestampLabel */,
+ ID_ATTRIBUTE /* idLabel */,
+ true /* needsAttributes */,
+ true /* needsMessageId */))),
PubSubReadPayload.newBuilder()
.setTopic(TOPIC.getFullPath())
.setIdAttribute(ID_ATTRIBUTE)
@@ -100,16 +102,17 @@ public class RunnerImplementedSourceTranslationTest {
},
{
// Read payload from runtime provided topic.
- new RunnerImplementedSource(
- new PubsubUnboundedSource(
- PubsubTestClient.createFactoryForCreateSubscription(),
- StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
- TOPIC_PROVIDER,
- null /* subscription */,
- null /* timestampLabel */,
- null /* idLabel */,
- false /* needsAttributes */,
- false /* needsMessageId */)),
+ Read.from(
+ new PubsubUnboundedSource.PubsubSource(
+ new PubsubUnboundedSource(
+ PubsubTestClient.createFactoryForCreateSubscription(),
+ StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+ TOPIC_PROVIDER,
+ null /* subscription */,
+ null /* timestampLabel */,
+ null /* idLabel */,
+ false /* needsAttributes */,
+ false /* needsMessageId */))),
PubSubReadPayload.newBuilder()
.setTopicRuntimeOverridden(((NestedValueProvider) TOPIC_PROVIDER).propertyName())
.setWithAttributes(false)
@@ -117,16 +120,17 @@ public class RunnerImplementedSourceTranslationTest {
},
{
// Read payload with attributes and message id from runtime provided topic.
- new RunnerImplementedSource(
- new PubsubUnboundedSource(
- PubsubTestClient.createFactoryForCreateSubscription(),
- StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
- TOPIC_PROVIDER,
- null /* subscription */,
- TIMESTAMP_ATTRIBUTE /* timestampLabel */,
- ID_ATTRIBUTE /* idLabel */,
- true /* needsAttributes */,
- true /* needsMessageId */)),
+ Read.from(
+ new PubsubUnboundedSource.PubsubSource(
+ new PubsubUnboundedSource(
+ PubsubTestClient.createFactoryForCreateSubscription(),
+ StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+ TOPIC_PROVIDER,
+ null /* subscription */,
+ TIMESTAMP_ATTRIBUTE /* timestampLabel */,
+ ID_ATTRIBUTE /* idLabel */,
+ true /* needsAttributes */,
+ true /* needsMessageId */))),
PubSubReadPayload.newBuilder()
.setTopicRuntimeOverridden(((NestedValueProvider) TOPIC_PROVIDER).propertyName())
.setIdAttribute(ID_ATTRIBUTE)
@@ -136,16 +140,17 @@ public class RunnerImplementedSourceTranslationTest {
},
{
// Read payload only from SUBSCRIPTION.
- new RunnerImplementedSource(
- new PubsubUnboundedSource(
- PubsubTestClient.createFactoryForCreateSubscription(),
- StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
- null /* topic */,
- StaticValueProvider.of(SUBSCRIPTION),
- null /* timestampLabel */,
- null /* idLabel */,
- false /* needsAttributes */,
- false /* needsMessageId */)),
+ Read.from(
+ new PubsubUnboundedSource.PubsubSource(
+ new PubsubUnboundedSource(
+ PubsubTestClient.createFactoryForCreateSubscription(),
+ StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+ null /* topic */,
+ StaticValueProvider.of(SUBSCRIPTION),
+ null /* timestampLabel */,
+ null /* idLabel */,
+ false /* needsAttributes */,
+ false /* needsMessageId */))),
PubSubReadPayload.newBuilder()
.setSubscription(SUBSCRIPTION.getFullPath())
.setWithAttributes(false)
@@ -153,16 +158,17 @@ public class RunnerImplementedSourceTranslationTest {
},
{
// Read payload with attributes and message id from SUBSCRIPTION.
- new RunnerImplementedSource(
- new PubsubUnboundedSource(
- PubsubTestClient.createFactoryForCreateSubscription(),
- StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
- null /* topic */,
- StaticValueProvider.of(SUBSCRIPTION),
- TIMESTAMP_ATTRIBUTE /* timestampLabel */,
- ID_ATTRIBUTE /* idLabel */,
- true /* needsAttributes */,
- true /* needsMessageId */)),
+ Read.from(
+ new PubsubUnboundedSource.PubsubSource(
+ new PubsubUnboundedSource(
+ PubsubTestClient.createFactoryForCreateSubscription(),
+ StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+ null /* topic */,
+ StaticValueProvider.of(SUBSCRIPTION),
+ TIMESTAMP_ATTRIBUTE /* timestampLabel */,
+ ID_ATTRIBUTE /* idLabel */,
+ true /* needsAttributes */,
+ true /* needsMessageId */))),
PubSubReadPayload.newBuilder()
.setSubscription(SUBSCRIPTION.getFullPath())
.setIdAttribute(ID_ATTRIBUTE)
@@ -172,16 +178,17 @@ public class RunnerImplementedSourceTranslationTest {
},
{
// Read payload only from runtime provided subscription.
- new RunnerImplementedSource(
- new PubsubUnboundedSource(
- PubsubTestClient.createFactoryForCreateSubscription(),
- StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
- null /* topic */,
- SUBSCRIPTION_PROVIDER,
- null /* timestampLabel */,
- null /* idLabel */,
- false /* needsAttributes */,
- false /* needsMessageId */)),
+ Read.from(
+ new PubsubUnboundedSource.PubsubSource(
+ new PubsubUnboundedSource(
+ PubsubTestClient.createFactoryForCreateSubscription(),
+ StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+ null /* topic */,
+ SUBSCRIPTION_PROVIDER,
+ null /* timestampLabel */,
+ null /* idLabel */,
+ false /* needsAttributes */,
+ false /* needsMessageId */))),
PubSubReadPayload.newBuilder()
.setSubscriptionRuntimeOverridden(
((NestedValueProvider) SUBSCRIPTION_PROVIDER).propertyName())
@@ -190,16 +197,17 @@ public class RunnerImplementedSourceTranslationTest {
},
{
// Read payload with attributes and message id from runtime provided subscription.
- new RunnerImplementedSource(
- new PubsubUnboundedSource(
- PubsubTestClient.createFactoryForCreateSubscription(),
- StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
- null /* topic */,
- SUBSCRIPTION_PROVIDER,
- TIMESTAMP_ATTRIBUTE /* timestampLabel */,
- ID_ATTRIBUTE /* idLabel */,
- true /* needsAttributes */,
- true /* needsMessageId */)),
+ Read.from(
+ new PubsubUnboundedSource.PubsubSource(
+ new PubsubUnboundedSource(
+ PubsubTestClient.createFactoryForCreateSubscription(),
+ StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)),
+ null /* topic */,
+ SUBSCRIPTION_PROVIDER,
+ TIMESTAMP_ATTRIBUTE /* timestampLabel */,
+ ID_ATTRIBUTE /* idLabel */,
+ true /* needsAttributes */,
+ true /* needsMessageId */))),
PubSubReadPayload.newBuilder()
.setSubscriptionRuntimeOverridden(
((NestedValueProvider) SUBSCRIPTION_PROVIDER).propertyName())
@@ -212,24 +220,25 @@ public class RunnerImplementedSourceTranslationTest {
}
@Parameter(0)
- public RunnerImplementedSource runnerImplementedSource;
+ public Read.Unbounded<byte[]> readFromPubSub;
@Parameter(1)
public PubSubReadPayload pubsubReadPayload;
@Test
public void testTranslateSourceToFunctionSpec() throws Exception {
- PCollection<byte[]> output = pipeline.apply(runnerImplementedSource);
- AppliedPTransform<?, ?, RunnerImplementedSource> appliedPTransform =
+ PCollection<byte[]> output = pipeline.apply(readFromPubSub);
+ AppliedPTransform<?, ?, Read.Unbounded<byte[]>> appliedPTransform =
AppliedPTransform.of(
- "sink",
+ "ReadFromPubsub",
PValues.expandInput(pipeline.begin()),
PValues.expandOutput(output),
- runnerImplementedSource,
+ readFromPubSub,
pipeline);
SdkComponents components = SdkComponents.create();
components.registerEnvironment(Environments.createDockerEnvironment("java"));
- RunnerApi.FunctionSpec spec = sourceTranslator.translate(appliedPTransform, components);
+ RunnerApi.FunctionSpec spec =
+ sourceTranslator.translate((AppliedPTransform) appliedPTransform, components);
assertEquals(PTransformTranslation.PUBSUB_READ, spec.getUrn());
PubSubReadPayload result = PubSubReadPayload.parseFrom(spec.getPayload());
assertEquals(pubsubReadPayload, result);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java
similarity index 80%
rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslationTest.java
rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java
index 52ebd22..872e671 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslationTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java
@@ -25,8 +25,9 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubWritePayload;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.sdk.io.gcp.pubsub.PubSubPayloadTranslation.PubSubWritePayloadTranslator;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSinkTranslation.RunnerImplementedSinkTranslator;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.PubsubSink;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -44,18 +45,18 @@ import org.junit.runners.JUnit4;
/** Test RunnerImplementedSinkTranslator. */
@RunWith(JUnit4.class)
-public class RunnerImplementedSinkTranslationTest {
+public class PubSubWritePayloadTranslationTest {
private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
private static final String ID_ATTRIBUTE = "id";
private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
- private final RunnerImplementedSinkTranslator sinkTranslator =
- new RunnerImplementedSinkTranslator();
+ private final PubSubPayloadTranslation.PubSubWritePayloadTranslator sinkTranslator =
+ new PubSubWritePayloadTranslator();
@Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
@Test
public void testTranslateSinkWithTopic() throws Exception {
- PubsubUnboundedSink pubsubSink =
+ PubsubUnboundedSink pubsubUnboundedSink =
new PubsubUnboundedSink(
null,
StaticValueProvider.of(TOPIC),
@@ -66,16 +67,12 @@ public class RunnerImplementedSinkTranslationTest {
0,
Duration.ZERO,
null);
- RunnerImplementedSink runnerImplementedSink = new RunnerImplementedSink(pubsubSink);
+ PubsubUnboundedSink.PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSink);
PCollection<byte[]> input = pipeline.apply(Create.of(new byte[0]));
- PDone output = input.apply(runnerImplementedSink);
- AppliedPTransform<?, ?, RunnerImplementedSink> appliedPTransform =
+ PDone output = input.apply(pubsubSink);
+ AppliedPTransform<?, ?, PubsubSink> appliedPTransform =
AppliedPTransform.of(
- "sink",
- PValues.expandInput(input),
- PValues.expandOutput(output),
- runnerImplementedSink,
- pipeline);
+ "sink", PValues.expandInput(input), PValues.expandOutput(output), pubsubSink, pipeline);
SdkComponents components = SdkComponents.create();
components.registerEnvironment(Environments.createDockerEnvironment("java"));
RunnerApi.FunctionSpec spec = sinkTranslator.translate(appliedPTransform, components);
@@ -91,19 +88,15 @@ public class RunnerImplementedSinkTranslationTest {
@Test
public void testTranslateSinkWithTopicOverridden() throws Exception {
ValueProvider<TopicPath> runtimeProvider = pipeline.newProvider(TOPIC);
- PubsubUnboundedSink pubsubSink =
+ PubsubUnboundedSink pubsubUnboundedSinkSink =
new PubsubUnboundedSink(
null, runtimeProvider, TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 0, 0, 0, Duration.ZERO, null);
- RunnerImplementedSink runnerImplementedSink = new RunnerImplementedSink(pubsubSink);
+ PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSinkSink);
PCollection<byte[]> input = pipeline.apply(Create.of(new byte[0]));
- PDone output = input.apply(runnerImplementedSink);
- AppliedPTransform<?, ?, RunnerImplementedSink> appliedPTransform =
+ PDone output = input.apply(pubsubSink);
+ AppliedPTransform<?, ?, PubsubSink> appliedPTransform =
AppliedPTransform.of(
- "sink",
- PValues.expandInput(input),
- PValues.expandOutput(output),
- runnerImplementedSink,
- pipeline);
+ "sink", PValues.expandInput(input), PValues.expandOutput(output), pubsubSink, pipeline);
SdkComponents components = SdkComponents.create();
components.registerEnvironment(Environments.createDockerEnvironment("java"));
RunnerApi.FunctionSpec spec = sinkTranslator.translate(appliedPTransform, components);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
index e9c2a15..7e7e2e2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
@@ -159,13 +159,17 @@ public class PubsubIOExternalTest {
RunnerApi.PTransform writeComposite =
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(1));
- // test_namespacetest/PubsubUnboundedSink/PubsubUnboundedSink.Writer
+ // test_namespacetest/PubsubUnboundedSink/PubsubSink
RunnerApi.PTransform writeComposite2 =
- result.getComponents().getTransformsOrThrow(writeComposite.getSubtransforms(3));
+ result.getComponents().getTransformsOrThrow(writeComposite.getSubtransforms(1));
- // test_namespacetest/PubsubUnboundedSink/PubsubUnboundedSink.Writer/ParMultiDo(Writer)
+ // test_namespacetest/PubsubUnboundedSink/PubsubSink/PubsubUnboundedSink.Writer
+ RunnerApi.PTransform writeComposite3 =
+ result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(3));
+
+ // test_namespacetest/PubsubUnboundedSink/PubsubSink/PubsubUnboundedSink.Writer/ParMultiDo(Writer)
RunnerApi.PTransform writeParDo =
- result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(0));
+ result.getComponents().getTransformsOrThrow(writeComposite3.getSubtransforms(0));
RunnerApi.ParDoPayload parDoPayload =
RunnerApi.ParDoPayload.parseFrom(writeParDo.getSpec().getPayload());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index 8c25c40..a9564d3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertTrue;
import com.google.api.client.util.Clock;
import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -120,8 +121,12 @@ public class PubsubUnboundedSourceTest {
factory = null;
}
- private static String data(PubsubMessage message) {
- return new String(message.getPayload(), StandardCharsets.UTF_8);
+ private static String data(byte[] message, boolean payloadOnly) throws Exception {
+ if (payloadOnly) {
+ return new String(message, StandardCharsets.UTF_8);
+ }
+ PubsubMessage data = PubsubMessage.parseFrom(message);
+ return new String(data.getData().toByteArray(), StandardCharsets.UTF_8);
}
@Test
@@ -134,12 +139,16 @@ public class PubsubUnboundedSourceTest {
}
@Test
- public void readOneMessage() throws IOException {
+ public void readOneMessage() throws Exception {
setupOneMessage();
PubsubReader reader = primSource.createReader(p.getOptions(), null);
// Read one message.
assertTrue(reader.start());
- assertEquals(DATA, data(reader.getCurrent()));
+ assertEquals(
+ DATA,
+ data(
+ reader.getCurrent(),
+ !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
assertFalse(reader.advance());
// ACK the message.
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
@@ -148,18 +157,26 @@ public class PubsubUnboundedSourceTest {
}
@Test
- public void timeoutAckAndRereadOneMessage() throws IOException {
+ public void timeoutAckAndRereadOneMessage() throws Exception {
setupOneMessage();
PubsubReader reader = primSource.createReader(p.getOptions(), null);
PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
assertTrue(reader.start());
- assertEquals(DATA, data(reader.getCurrent()));
+ assertEquals(
+ DATA,
+ data(
+ reader.getCurrent(),
+ !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
// Let the ACK deadline for the above expire.
now.addAndGet(65 * 1000);
pubsubClient.advance();
// We'll now receive the same message again.
assertTrue(reader.advance());
- assertEquals(DATA, data(reader.getCurrent()));
+ assertEquals(
+ DATA,
+ data(
+ reader.getCurrent(),
+ !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
assertFalse(reader.advance());
// Now ACK the message.
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
@@ -168,13 +185,17 @@ public class PubsubUnboundedSourceTest {
}
@Test
- public void extendAck() throws IOException {
+ public void extendAck() throws Exception {
setupOneMessage();
PubsubReader reader = primSource.createReader(p.getOptions(), null);
PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
// Pull the first message but don't take a checkpoint for it.
assertTrue(reader.start());
- assertEquals(DATA, data(reader.getCurrent()));
+ assertEquals(
+ DATA,
+ data(
+ reader.getCurrent(),
+ !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
// Extend the ack
now.addAndGet(55 * 1000);
pubsubClient.advance();
@@ -190,13 +211,17 @@ public class PubsubUnboundedSourceTest {
}
@Test
- public void timeoutAckExtensions() throws IOException {
+ public void timeoutAckExtensions() throws Exception {
setupOneMessage();
PubsubReader reader = primSource.createReader(p.getOptions(), null);
PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
// Pull the first message but don't take a checkpoint for it.
assertTrue(reader.start());
- assertEquals(DATA, data(reader.getCurrent()));
+ assertEquals(
+ DATA,
+ data(
+ reader.getCurrent(),
+ !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
// Extend the ack.
now.addAndGet(55 * 1000);
pubsubClient.advance();
@@ -212,7 +237,11 @@ public class PubsubUnboundedSourceTest {
pubsubClient.advance();
// Reread the same message.
assertTrue(reader.advance());
- assertEquals(DATA, data(reader.getCurrent()));
+ assertEquals(
+ DATA,
+ data(
+ reader.getCurrent(),
+ !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
// Now ACK the message.
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
@@ -220,7 +249,7 @@ public class PubsubUnboundedSourceTest {
}
@Test
- public void multipleReaders() throws IOException {
+ public void multipleReaders() throws Exception {
List<IncomingMessage> incoming = new ArrayList<>();
for (int i = 0; i < 2; i++) {
String data = String.format("data_%d", i);
@@ -239,7 +268,11 @@ public class PubsubUnboundedSourceTest {
PubsubReader reader = primSource.createReader(p.getOptions(), null);
// Consume two messages, only read one.
assertTrue(reader.start());
- assertEquals("data_0", data(reader.getCurrent()));
+ assertEquals(
+ "data_0",
+ data(
+ reader.getCurrent(),
+ !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
// Grab checkpoint.
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
@@ -249,7 +282,11 @@ public class PubsubUnboundedSourceTest {
// Read second message.
assertTrue(reader.advance());
- assertEquals("data_1", data(reader.getCurrent()));
+ assertEquals(
+ "data_1",
+ data(
+ reader.getCurrent(),
+ !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
// Restore from checkpoint.
byte[] checkpointBytes =
@@ -262,7 +299,11 @@ public class PubsubUnboundedSourceTest {
// Re-read second message.
reader = primSource.createReader(p.getOptions(), checkpoint);
assertTrue(reader.start());
- assertEquals("data_1", data(reader.getCurrent()));
+ assertEquals(
+ "data_1",
+ data(
+ reader.getCurrent(),
+ !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())));
// We are done.
assertFalse(reader.advance());
@@ -278,7 +319,7 @@ public class PubsubUnboundedSourceTest {
}
@Test
- public void readManyMessages() throws IOException {
+ public void readManyMessages() throws Exception {
Map<String, Integer> dataToMessageNum = new HashMap<>();
final int m = 97;
@@ -315,7 +356,10 @@ public class PubsubUnboundedSourceTest {
// We'll checkpoint and ack within the 2min limit.
now.addAndGet(30);
pubsubClient.advance();
- String data = data(reader.getCurrent());
+ String data =
+ data(
+ reader.getCurrent(),
+ !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId()));
Integer messageNum = dataToMessageNum.remove(data);
// No duplicate messages.
assertNotNull(messageNum);