You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 19:54:09 UTC
[44/50] [abbrv] beam git commit: Removed coder and parseFn from
PubsubIO.Read
Removed coder and parseFn from PubsubIO.Read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/25dc94bc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/25dc94bc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/25dc94bc
Branch: refs/heads/gearpump-runner
Commit: 25dc94bc971a0cb8848777c48495929f6dbe8f69
Parents: dc0fdcb
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 20 19:03:27 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 23:08:29 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 49 ++++---
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 116 +++++++++++----
.../pubsub/PubsubMessagePayloadOnlyCoder.java | 48 +++++++
.../PubsubMessageWithAttributesCoder.java | 57 ++++++++
.../io/gcp/pubsub/PubsubUnboundedSource.java | 141 +++++++------------
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 32 ++---
.../gcp/pubsub/PubsubUnboundedSourceTest.java | 108 +++++++-------
7 files changed, 350 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 68cc8e8..6f29797 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -86,6 +86,8 @@ import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -103,6 +105,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -856,29 +859,30 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// ================================================================================
/**
- * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we
- * can instead defer to Windmill's implementation.
+ * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we can
+ * instead defer to Windmill's implementation.
*/
- private static class StreamingPubsubIORead<T> extends PTransform<PBegin, PCollection<T>> {
- private final PubsubUnboundedSource<T> transform;
+ private static class StreamingPubsubIORead
+ extends PTransform<PBegin, PCollection<PubsubIO.PubsubMessage>> {
+ private final PubsubUnboundedSource transform;
/**
* Builds an instance of this class from the overridden transform.
*/
public StreamingPubsubIORead(
- DataflowRunner runner, PubsubUnboundedSource<T> transform) {
+ DataflowRunner runner, PubsubUnboundedSource transform) {
this.transform = transform;
}
- PubsubUnboundedSource<T> getOverriddenTransform() {
+ PubsubUnboundedSource getOverriddenTransform() {
return transform;
}
@Override
- public PCollection<T> expand(PBegin input) {
- return PCollection.<T>createPrimitiveOutputInternal(
+ public PCollection<PubsubIO.PubsubMessage> expand(PBegin input) {
+ return PCollection.<PubsubIO.PubsubMessage>createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(transform.getElementCoder());
+ .setCoder(new PubsubMessageWithAttributesCoder());
}
@Override
@@ -888,19 +892,19 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
static {
DataflowPipelineTranslator.registerTransformTranslator(
- StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator<>());
+ StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator());
}
}
/** Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. */
- private static class StreamingPubsubIOReadTranslator<T>
- implements TransformTranslator<StreamingPubsubIORead<T>> {
+ private static class StreamingPubsubIOReadTranslator
+ implements TransformTranslator<StreamingPubsubIORead> {
@Override
- public void translate(StreamingPubsubIORead<T> transform, TranslationContext context) {
+ public void translate(StreamingPubsubIORead transform, TranslationContext context) {
checkArgument(
context.getPipelineOptions().isStreaming(),
"StreamingPubsubIORead is only for streaming pipelines.");
- PubsubUnboundedSource<T> overriddenTransform = transform.getOverriddenTransform();
+ PubsubUnboundedSource overriddenTransform = transform.getOverriddenTransform();
StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
stepContext.addInput(PropertyNames.FORMAT, "pubsub");
if (overriddenTransform.getTopicProvider() != null) {
@@ -932,16 +936,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
stepContext.addInput(
PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute());
}
- if (overriddenTransform.getWithAttributesParseFn() != null) {
+ // In both cases, the transform needs to read PubsubMessage. However, in case it needs
+ // the attributes, we supply an identity "parse fn" so the worker will read PubsubMessage's
+ // from Windmill and simply pass them around; and in case it doesn't need attributes,
+ // we're already implicitly using a "Coder" that interprets the data as a PubsubMessage's
+ // payload.
+ if (overriddenTransform.getNeedsAttributes()) {
stepContext.addInput(
PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
byteArrayToJsonString(
- serializeToByteArray(overriddenTransform.getWithAttributesParseFn())));
+ serializeToByteArray(new IdentityMessageFn())));
}
stepContext.addOutput(context.getOutput(transform));
}
}
+ private static class IdentityMessageFn
+ extends SimpleFunction<PubsubIO.PubsubMessage, PubsubIO.PubsubMessage> {
+ @Override
+ public PubsubIO.PubsubMessage apply(PubsubIO.PubsubMessage input) {
+ return input;
+ }
+ }
+
/**
* Suppress application of {@link PubsubUnboundedSink#expand} in streaming mode so that we
* can instead defer to Windmill's implementation.
http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 129a25f..af8b7d6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -24,6 +24,7 @@ import com.google.auto.value.AutoValue;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -32,6 +33,7 @@ import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
@@ -44,6 +46,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -461,8 +464,34 @@ public class PubsubIO {
}
/** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */
- public static <T> Read<T> read() {
- return new AutoValue_PubsubIO_Read.Builder<T>().build();
+ private static <T> Read<T> read() {
+ return new AutoValue_PubsubIO_Read.Builder<T>().setNeedsAttributes(false).build();
+ }
+
+ /**
+ * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The
+ * messages will only contain a {@link PubsubMessage#getMessage() payload}, but no {@link
+ * PubsubMessage#getAttributeMap() attributes}.
+ */
+ public static Read<PubsubMessage> readPubsubMessagesWithoutAttributes() {
+ return new AutoValue_PubsubIO_Read.Builder<PubsubMessage>()
+ .setCoder(PubsubMessagePayloadOnlyCoder.of())
+ .setParseFn(new IdentityMessageFn())
+ .setNeedsAttributes(false)
+ .build();
+ }
+
+ /**
+ * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The
+ * messages will contain both a {@link PubsubMessage#getMessage() payload} and {@link
+ * PubsubMessage#getAttributeMap() attributes}.
+ */
+ public static Read<PubsubMessage> readPubsubMessagesWithAttributes() {
+ return new AutoValue_PubsubIO_Read.Builder<PubsubMessage>()
+ .setCoder(PubsubMessageWithAttributesCoder.of())
+ .setParseFn(new IdentityMessageFn())
+ .setNeedsAttributes(true)
+ .build();
}
/**
@@ -470,7 +499,8 @@ public class PubsubIO {
* Pub/Sub stream.
*/
public static Read<String> readStrings() {
- return PubsubIO.<String>read().withCoder(StringUtf8Coder.of());
+ return PubsubIO.<String>read().withCoderAndParseFn(
+ StringUtf8Coder.of(), new ParsePayloadAsUtf8());
}
/**
@@ -478,7 +508,11 @@ public class PubsubIO {
* given type from a Google Cloud Pub/Sub stream.
*/
public static <T extends Message> Read<T> readProtos(Class<T> messageClass) {
- return PubsubIO.<T>read().withCoder(ProtoCoder.of(messageClass));
+ // TODO: Stop using ProtoCoder and instead parse the payload directly.
+ // We should not be relying on the fact that ProtoCoder's wire format is identical to
+ // the protobuf wire format, as the wire format is not part of a coder's API.
+ ProtoCoder<T> coder = ProtoCoder.of(messageClass);
+ return PubsubIO.<T>read().withCoderAndParseFn(coder, new ParsePayloadUsingCoder<>(coder));
}
/**
@@ -486,7 +520,11 @@ public class PubsubIO {
* given type from a Google Cloud Pub/Sub stream.
*/
public static <T extends Message> Read<T> readAvros(Class<T> clazz) {
- return PubsubIO.<T>read().withCoder(AvroCoder.of(clazz));
+ // TODO: Stop using AvroCoder and instead parse the payload directly.
+ // We should not be relying on the fact that AvroCoder's wire format is identical to
+ // the Avro wire format, as the wire format is not part of a coder's API.
+ AvroCoder<T> coder = AvroCoder.of(clazz);
+ return PubsubIO.<T>read().withCoderAndParseFn(coder, new ParsePayloadUsingCoder<>(coder));
}
/** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */
@@ -543,6 +581,8 @@ public class PubsubIO {
@Nullable
abstract SimpleFunction<PubsubMessage, T> getParseFn();
+ abstract boolean getNeedsAttributes();
+
abstract Builder<T> toBuilder();
@AutoValue.Builder
@@ -559,6 +599,8 @@ public class PubsubIO {
abstract Builder<T> setParseFn(SimpleFunction<PubsubMessage, T> parseFn);
+ abstract Builder<T> setNeedsAttributes(boolean needsAttributes);
+
abstract Read<T> build();
}
@@ -665,20 +707,13 @@ public class PubsubIO {
}
/**
- * Uses the given {@link Coder} to decode each record into a value of type {@code T}.
- */
- public Read<T> withCoder(Coder<T> coder) {
- return toBuilder().setCoder(coder).build();
- }
-
- /**
* Causes the source to return a PubsubMessage that includes Pubsub attributes, and uses the
* given parsing function to transform the PubsubMessage into an output type.
* A Coder for the output type T must be registered or set on the output via
* {@link PCollection#setCoder(Coder)}.
*/
- public Read<T> withParseFn(SimpleFunction<PubsubMessage, T> parseFn) {
- return toBuilder().setParseFn(parseFn).build();
+ private Read<T> withCoderAndParseFn(Coder<T> coder, SimpleFunction<PubsubMessage, T> parseFn) {
+ return toBuilder().setCoder(coder).setParseFn(parseFn).build();
}
@Override
@@ -691,10 +726,6 @@ public class PubsubIO {
throw new IllegalStateException(
"Can't set both the topic and the subscription for " + "a PubsubIO.Read transform");
}
- if (getCoder() == null) {
- throw new IllegalStateException(
- "PubsubIO.Read requires that a coder be set using " + "the withCoder method.");
- }
@Nullable
ValueProvider<ProjectPath> projectPath =
@@ -711,17 +742,23 @@ public class PubsubIO {
getSubscriptionProvider() == null
? null
: NestedValueProvider.of(getSubscriptionProvider(), new SubscriptionPathTranslator());
- PubsubUnboundedSource<T> source =
- new PubsubUnboundedSource<T>(
+ PubsubUnboundedSource source =
+ new PubsubUnboundedSource(
FACTORY,
projectPath,
topicPath,
subscriptionPath,
- getCoder(),
getTimestampAttribute(),
getIdAttribute(),
- getParseFn());
- return input.getPipeline().apply(source);
+ getNeedsAttributes());
+ return input
+ .getPipeline()
+ .apply(source)
+ .setCoder(
+ getNeedsAttributes()
+ ? PubsubMessageWithAttributesCoder.of()
+ : PubsubMessagePayloadOnlyCoder.of())
+ .apply(MapElements.via(getParseFn()));
}
@Override
@@ -847,7 +884,7 @@ public class PubsubIO {
* function translates the input type T to a PubsubMessage object, which is used by the sink
* to separately set the PubSub message's payload and attributes.
*/
- public Write<T> withFormatFn(SimpleFunction<T, PubsubMessage> formatFn) {
+ private Write<T> withFormatFn(SimpleFunction<T, PubsubMessage> formatFn) {
return toBuilder().setFormatFn(formatFn).build();
}
@@ -954,4 +991,35 @@ public class PubsubIO {
}
}
}
+
+ private static class ParsePayloadAsUtf8 extends SimpleFunction<PubsubMessage, String> {
+ @Override
+ public String apply(PubsubMessage input) {
+ return new String(input.getMessage(), StandardCharsets.UTF_8);
+ }
+ }
+
+ private static class ParsePayloadUsingCoder<T> extends SimpleFunction<PubsubMessage, T> {
+ private Coder<T> coder;
+
+ public ParsePayloadUsingCoder(Coder<T> coder) {
+ this.coder = coder;
+ }
+
+ @Override
+ public T apply(PubsubMessage input) {
+ try {
+ return CoderUtils.decodeFromByteArray(coder, input.getMessage());
+ } catch (CoderException e) {
+ throw new RuntimeException("Could not decode Pubsub message", e);
+ }
+ }
+ }
+
+ private static class IdentityMessageFn extends SimpleFunction<PubsubMessage, PubsubMessage> {
+ @Override
+ public PubsubMessage apply(PubsubMessage input) {
+ return input;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
new file mode 100644
index 0000000..f0dae46
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.util.StreamUtils;
+
+/** A coder for PubsubMessage treating the raw bytes being decoded as the message's payload. */
+public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubIO.PubsubMessage> {
+ public static PubsubMessagePayloadOnlyCoder of() {
+ return new PubsubMessagePayloadOnlyCoder();
+ }
+
+ @Override
+ public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context)
+ throws IOException {
+ checkState(context.isWholeStream, "Expected to only be used in a whole-stream context");
+ outStream.write(value.getMessage());
+ }
+
+ @Override
+ public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException {
+ checkState(context.isWholeStream, "Expected to only be used in a whole-stream context");
+ return new PubsubIO.PubsubMessage(
+ StreamUtils.getBytes(inStream), ImmutableMap.<String, String>of());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
new file mode 100644
index 0000000..27f0f02
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/** A coder for PubsubMessage including attributes. */
+public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubIO.PubsubMessage> {
+ private static final Coder<byte[]> PAYLOAD_CODER =
+ NullableCoder.of(ByteArrayCoder.of());
+ private static final Coder<Map<String, String>> ATTRIBUTES_CODER = MapCoder.of(
+ StringUtf8Coder.of(), StringUtf8Coder.of());
+
+ public static PubsubMessageWithAttributesCoder of() {
+ return new PubsubMessageWithAttributesCoder();
+ }
+
+ public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context)
+ throws IOException {
+ PAYLOAD_CODER.encode(
+ value.getMessage(),
+ outStream,
+ context.nested());
+ ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context);
+ }
+
+ @Override
+ public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException {
+ byte[] payload = PAYLOAD_CODER.decode(inStream, context.nested());
+ Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context);
+ return new PubsubIO.PubsubMessage(payload, attributes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 903ae41..d366949 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
@@ -66,13 +65,11 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BucketingFunction;
-import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -110,7 +107,7 @@ import org.slf4j.LoggerFactory;
* {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency.
* </ul>
*/
-public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> {
+public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<PubsubIO.PubsubMessage>> {
private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
/**
@@ -121,7 +118,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
/**
* Coder for checkpoints.
*/
- private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = new PubsubCheckpointCoder<>();
+ private static final PubsubCheckpointCoder CHECKPOINT_CODER = new PubsubCheckpointCoder();
/**
* Maximum number of messages per pull.
@@ -231,7 +228,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
* we need to restore.
*/
@VisibleForTesting
- static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark {
+ static class PubsubCheckpoint implements UnboundedSource.CheckpointMark {
/**
* The {@link SubscriptionPath} to the subscription the reader is reading from. May be
* {@code null} if the {@link PubsubUnboundedSource} contains the subscription.
@@ -247,7 +244,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
* the 'true' active reader may have changed.
*/
@Nullable
- private PubsubReader<T> reader;
+ private PubsubReader reader;
/**
* If the checkpoint is for persisting: The ACK ids of messages which have been passed
@@ -268,7 +265,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
public PubsubCheckpoint(
@Nullable String subscriptionPath,
- @Nullable PubsubReader<T> reader,
+ @Nullable PubsubReader reader,
@Nullable List<String> safeToAckIds,
List<String> notYetReadIds) {
this.subscriptionPath = subscriptionPath;
@@ -327,7 +324,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
/**
* Return current time according to {@code reader}.
*/
- private static long now(PubsubReader<?> reader) {
+ private static long now(PubsubReader reader) {
if (reader.outer.outer.clock == null) {
return System.currentTimeMillis();
} else {
@@ -340,7 +337,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
* NACK all messages which have been read from Pubsub but not passed downstream.
* This way Pubsub will send them again promptly.
*/
- public void nackAll(PubsubReader<T> reader) throws IOException {
+ public void nackAll(PubsubReader reader) throws IOException {
checkState(this.reader == null, "Cannot nackAll on persisting checkpoint");
List<String> batchYetToAckIds =
new ArrayList<>(Math.min(notYetReadIds.size(), ACK_BATCH_SIZE));
@@ -360,13 +357,13 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
/** The coder for our checkpoints. */
- private static class PubsubCheckpointCoder<T> extends CustomCoder<PubsubCheckpoint<T>> {
+ private static class PubsubCheckpointCoder extends CustomCoder<PubsubCheckpoint> {
private static final Coder<String> SUBSCRIPTION_PATH_CODER =
NullableCoder.of(StringUtf8Coder.of());
private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());
@Override
- public void encode(PubsubCheckpoint<T> value, OutputStream outStream, Context context)
+ public void encode(PubsubCheckpoint value, OutputStream outStream, Context context)
throws IOException {
SUBSCRIPTION_PATH_CODER.encode(
value.subscriptionPath,
@@ -376,10 +373,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
@Override
- public PubsubCheckpoint<T> decode(InputStream inStream, Context context) throws IOException {
+ public PubsubCheckpoint decode(InputStream inStream, Context context) throws IOException {
String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested());
List<String> notYetReadIds = LIST_CODER.decode(inStream, context);
- return new PubsubCheckpoint<>(path, null, null, notYetReadIds);
+ return new PubsubCheckpoint(path, null, null, notYetReadIds);
}
}
@@ -392,16 +389,14 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
* but not yet consumed downstream and/or ACKed back to Pubsub.
*/
@VisibleForTesting
- static class PubsubReader<T> extends UnboundedSource.UnboundedReader<T> {
+ static class PubsubReader extends UnboundedSource.UnboundedReader<PubsubIO.PubsubMessage> {
/**
* For access to topic and checkpointCoder.
*/
- private final PubsubSource<T> outer;
+ private final PubsubSource outer;
@VisibleForTesting
final SubscriptionPath subscription;
- private final SimpleFunction<PubsubIO.PubsubMessage, T> parseFn;
-
/**
* Client on which to talk to Pubsub. Contains a null value if the client has been closed.
*/
@@ -593,12 +588,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
/**
* Construct a reader.
*/
- public PubsubReader(PubsubOptions options, PubsubSource<T> outer, SubscriptionPath subscription,
- SimpleFunction<PubsubIO.PubsubMessage, T> parseFn)
+ public PubsubReader(PubsubOptions options, PubsubSource outer, SubscriptionPath subscription)
throws IOException, GeneralSecurityException {
this.outer = outer;
this.subscription = subscription;
- this.parseFn = parseFn;
pubsubClient =
new AtomicReference<>(
outer.outer.pubsubFactory.newClient(
@@ -970,20 +963,11 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
@Override
- public T getCurrent() throws NoSuchElementException {
+ public PubsubIO.PubsubMessage getCurrent() throws NoSuchElementException {
if (current == null) {
throw new NoSuchElementException();
}
- try {
- if (parseFn != null) {
- return parseFn.apply(new PubsubIO.PubsubMessage(
- current.elementBytes, current.attributes));
- } else {
- return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes);
- }
- } catch (CoderException e) {
- throw new RuntimeException("Unable to decode element from Pubsub message: ", e);
- }
+ return new PubsubIO.PubsubMessage(current.elementBytes, current.attributes);
}
@Override
@@ -1031,7 +1015,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
@Override
- public PubsubSource<T> getCurrentSource() {
+ public PubsubSource getCurrentSource() {
return outer;
}
@@ -1073,7 +1057,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
@Override
- public PubsubCheckpoint<T> getCheckpointMark() {
+ public PubsubCheckpoint getCheckpointMark() {
int cur = numInFlightCheckpoints.incrementAndGet();
maxInFlightCheckpoints = Math.max(maxInFlightCheckpoints, cur);
// It's possible for a checkpoint to be taken but never finalized.
@@ -1087,10 +1071,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
if (outer.subscriptionPath == null) {
// need to include the subscription in case we resume, as it's not stored in the source.
- return new PubsubCheckpoint<>(
+ return new PubsubCheckpoint(
subscription.getPath(), this, snapshotSafeToAckIds, snapshotNotYetReadIds);
}
- return new PubsubCheckpoint<>(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds);
+ return new PubsubCheckpoint(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds);
}
@Override
@@ -1104,28 +1088,28 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
// ================================================================================
@VisibleForTesting
- static class PubsubSource<T> extends UnboundedSource<T, PubsubCheckpoint<T>> {
- public final PubsubUnboundedSource<T> outer;
+ static class PubsubSource extends UnboundedSource<PubsubIO.PubsubMessage, PubsubCheckpoint> {
+ public final PubsubUnboundedSource outer;
// The subscription to read from.
@VisibleForTesting
final SubscriptionPath subscriptionPath;
- public PubsubSource(PubsubUnboundedSource<T> outer) {
+ public PubsubSource(PubsubUnboundedSource outer) {
this(outer, outer.getSubscription());
}
- private PubsubSource(PubsubUnboundedSource<T> outer, SubscriptionPath subscriptionPath) {
+ private PubsubSource(PubsubUnboundedSource outer, SubscriptionPath subscriptionPath) {
this.outer = outer;
this.subscriptionPath = subscriptionPath;
}
@Override
- public List<PubsubSource<T>> split(
+ public List<PubsubSource> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
- List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits);
- PubsubSource<T> splitSource = this;
+ List<PubsubSource> result = new ArrayList<>(desiredNumSplits);
+ PubsubSource splitSource = this;
if (subscriptionPath == null) {
- splitSource = new PubsubSource<>(outer, outer.createRandomSubscription(options));
+ splitSource = new PubsubSource(outer, outer.createRandomSubscription(options));
}
for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
// Since the source is immutable and Pubsub automatically shards we simply
@@ -1136,10 +1120,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
@Override
- public PubsubReader<T> createReader(
+ public PubsubReader createReader(
PipelineOptions options,
- @Nullable PubsubCheckpoint<T> checkpoint) {
- PubsubReader<T> reader;
+ @Nullable PubsubCheckpoint checkpoint) {
+ PubsubReader reader;
SubscriptionPath subscription = subscriptionPath;
if (subscription == null) {
if (checkpoint == null) {
@@ -1151,8 +1135,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
}
try {
- reader = new PubsubReader<>(options.as(PubsubOptions.class), this, subscription,
- outer.parseFn);
+ reader = new PubsubReader(options.as(PubsubOptions.class), this, subscription);
} catch (GeneralSecurityException | IOException e) {
throw new RuntimeException("Unable to subscribe to " + subscriptionPath + ": ", e);
}
@@ -1171,15 +1154,15 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Nullable
@Override
- public Coder<PubsubCheckpoint<T>> getCheckpointMarkCoder() {
- @SuppressWarnings("unchecked") PubsubCheckpointCoder<T> typedCoder =
- (PubsubCheckpointCoder<T>) CHECKPOINT_CODER;
+ public Coder<PubsubCheckpoint> getCheckpointMarkCoder() {
+ @SuppressWarnings("unchecked") PubsubCheckpointCoder typedCoder =
+ (PubsubCheckpointCoder) CHECKPOINT_CODER;
return typedCoder;
}
@Override
- public Coder<T> getDefaultOutputCoder() {
- return outer.elementCoder;
+ public Coder<PubsubIO.PubsubMessage> getDefaultOutputCoder() {
+ return new PubsubMessageWithAttributesCoder();
}
@Override
@@ -1198,7 +1181,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
// StatsFn
// ================================================================================
- private static class StatsFn<T> extends DoFn<T, T> {
+ private static class StatsFn extends DoFn<PubsubIO.PubsubMessage, PubsubIO.PubsubMessage> {
private final Counter elementCounter = SourceMetrics.elementsRead();
private final PubsubClientFactory pubsubFactory;
@@ -1292,13 +1275,6 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
private ValueProvider<SubscriptionPath> subscription;
/**
- * Coder for elements. Elements are effectively double-encoded: first to a byte array
- * using this checkpointCoder, then to a base-64 string to conform to Pubsub's payload
- * conventions.
- */
- private final Coder<T> elementCoder;
-
- /**
* Pubsub metadata field holding timestamp of each element, or {@literal null} if should use
* Pubsub message publish timestamp instead.
*/
@@ -1312,12 +1288,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Nullable
private final String idAttribute;
- /**
- * If not {@literal null}, the user is asking for PubSub attributes. This parse function will be
- * used to parse {@link PubsubIO.PubsubMessage}s containing a payload and attributes.
- */
- @Nullable
- SimpleFunction<PubsubIO.PubsubMessage, T> parseFn;
+ /** Whether this source should load the attributes of the PubsubMessage, or only the payload. */
+ private final boolean needsAttributes;
@VisibleForTesting
PubsubUnboundedSource(
@@ -1326,10 +1298,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Nullable ValueProvider<ProjectPath> project,
@Nullable ValueProvider<TopicPath> topic,
@Nullable ValueProvider<SubscriptionPath> subscription,
- Coder<T> elementCoder,
@Nullable String timestampAttribute,
@Nullable String idAttribute,
- @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
+ boolean needsAttributes) {
checkArgument((topic == null) != (subscription == null),
"Exactly one of topic and subscription must be given");
checkArgument((topic == null) == (project == null),
@@ -1339,10 +1310,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
this.project = project;
this.topic = topic;
this.subscription = subscription;
- this.elementCoder = checkNotNull(elementCoder);
this.timestampAttribute = timestampAttribute;
this.idAttribute = idAttribute;
- this.parseFn = parseFn;
+ this.needsAttributes = needsAttributes;
}
/**
@@ -1353,27 +1323,18 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Nullable ValueProvider<ProjectPath> project,
@Nullable ValueProvider<TopicPath> topic,
@Nullable ValueProvider<SubscriptionPath> subscription,
- Coder<T> elementCoder,
@Nullable String timestampAttribute,
@Nullable String idAttribute,
- @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
+ boolean needsAttributes) {
this(
null,
pubsubFactory,
project,
topic,
subscription,
- elementCoder,
timestampAttribute,
idAttribute,
- parseFn);
- }
-
- /**
- * Get the coder used for elements.
- */
- public Coder<T> getElementCoder() {
- return elementCoder;
+ needsAttributes);
}
/**
@@ -1432,20 +1393,16 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
return idAttribute;
}
- /**
- * Get the parsing function for PubSub attributes.
- */
- @Nullable
- public SimpleFunction<PubsubIO.PubsubMessage, T> getWithAttributesParseFn() {
- return parseFn;
+ public boolean getNeedsAttributes() {
+ return needsAttributes;
}
@Override
- public PCollection<T> expand(PBegin input) {
+ public PCollection<PubsubIO.PubsubMessage> expand(PBegin input) {
return input.getPipeline().begin()
- .apply(Read.from(new PubsubSource<T>(this)))
+ .apply(Read.from(new PubsubSource(this)))
.apply("PubsubUnboundedSource.Stats",
- ParDo.of(new StatsFn<T>(
+ ParDo.of(new StatsFn(
pubsubFactory, subscription, topic, timestampAttribute, idAttribute)));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 5f06b88..8f5d1ea 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -49,19 +49,19 @@ public class PubsubIOTest {
@Test
public void testPubsubIOGetName() {
assertEquals("PubsubIO.Read",
- PubsubIO.<String>read().fromTopic("projects/myproject/topics/mytopic").getName());
+ PubsubIO.readStrings().fromTopic("projects/myproject/topics/mytopic").getName());
assertEquals("PubsubIO.Write",
- PubsubIO.<String>write().to("projects/myproject/topics/mytopic").getName());
+ PubsubIO.writeStrings().to("projects/myproject/topics/mytopic").getName());
}
@Test
public void testTopicValidationSuccess() throws Exception {
- PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc");
- PubsubIO.<String>read().fromTopic("projects/my-project/topics/ABC");
- PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-DeF");
- PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234");
- PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
- PubsubIO.<String>read().fromTopic(new StringBuilder()
+ PubsubIO.readStrings().fromTopic("projects/my-project/topics/abc");
+ PubsubIO.readStrings().fromTopic("projects/my-project/topics/ABC");
+ PubsubIO.readStrings().fromTopic("projects/my-project/topics/AbC-DeF");
+ PubsubIO.readStrings().fromTopic("projects/my-project/topics/AbC-1234");
+ PubsubIO.readStrings().fromTopic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
+ PubsubIO.readStrings().fromTopic(new StringBuilder()
.append("projects/my-project/topics/A-really-long-one-")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -72,13 +72,13 @@ public class PubsubIOTest {
@Test
public void testTopicValidationBadCharacter() throws Exception {
thrown.expect(IllegalArgumentException.class);
- PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc-*-abc");
+ PubsubIO.readStrings().fromTopic("projects/my-project/topics/abc-*-abc");
}
@Test
public void testTopicValidationTooLong() throws Exception {
thrown.expect(IllegalArgumentException.class);
- PubsubIO.<String>read().fromTopic(new StringBuilder().append
+ PubsubIO.readStrings().fromTopic(new StringBuilder().append
("projects/my-project/topics/A-really-long-one-")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
.append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -91,7 +91,7 @@ public class PubsubIOTest {
String topic = "projects/project/topics/topic";
String subscription = "projects/project/subscriptions/subscription";
Duration maxReadTime = Duration.standardMinutes(5);
- PubsubIO.Read<String> read = PubsubIO.<String>read()
+ PubsubIO.Read<String> read = PubsubIO.readStrings()
.fromTopic(StaticValueProvider.of(topic))
.withTimestampAttribute("myTimestamp")
.withIdAttribute("myId");
@@ -108,7 +108,7 @@ public class PubsubIOTest {
String topic = "projects/project/topics/topic";
String subscription = "projects/project/subscriptions/subscription";
Duration maxReadTime = Duration.standardMinutes(5);
- PubsubIO.Read<String> read = PubsubIO.<String>read()
+ PubsubIO.Read<String> read = PubsubIO.readStrings()
.fromSubscription(StaticValueProvider.of(subscription))
.withTimestampAttribute("myTimestamp")
.withIdAttribute("myId");
@@ -123,7 +123,7 @@ public class PubsubIOTest {
@Test
public void testNullTopic() {
String subscription = "projects/project/subscriptions/subscription";
- PubsubIO.Read<String> read = PubsubIO.<String>read()
+ PubsubIO.Read<String> read = PubsubIO.readStrings()
.fromSubscription(StaticValueProvider.of(subscription));
assertNull(read.getTopicProvider());
assertNotNull(read.getSubscriptionProvider());
@@ -133,7 +133,7 @@ public class PubsubIOTest {
@Test
public void testNullSubscription() {
String topic = "projects/project/topics/topic";
- PubsubIO.Read<String> read = PubsubIO.<String>read()
+ PubsubIO.Read<String> read = PubsubIO.readStrings()
.fromTopic(StaticValueProvider.of(topic));
assertNotNull(read.getTopicProvider());
assertNull(read.getSubscriptionProvider());
@@ -166,7 +166,7 @@ public class PubsubIOTest {
@Test
public void testWriteDisplayData() {
String topic = "projects/project/topics/topic";
- PubsubIO.Write<?> write = PubsubIO.<String>write()
+ PubsubIO.Write<?> write = PubsubIO.writeStrings()
.to(topic)
.withTimestampAttribute("myTimestamp")
.withIdAttribute("myId");
@@ -182,7 +182,7 @@ public class PubsubIOTest {
@Category(ValidatesRunner.class)
public void testPrimitiveWriteDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PubsubIO.Write<?> write = PubsubIO.<String>write().to("projects/project/topics/topic");
+ PubsubIO.Write<?> write = PubsubIO.writeStrings().to("projects/project/topics/topic");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("PubsubIO.Write should include the topic in its primitive display data",
http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index dc66ea1..592dfa3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -34,12 +34,12 @@ import static org.junit.Assert.assertTrue;
import com.google.api.client.util.Clock;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
@@ -79,7 +79,7 @@ public class PubsubUnboundedSourceTest {
private AtomicLong now;
private Clock clock;
private PubsubTestClientFactory factory;
- private PubsubSource<String> primSource;
+ private PubsubSource primSource;
@Rule
public TestPipeline p = TestPipeline.create();
@@ -93,11 +93,11 @@ public class PubsubUnboundedSourceTest {
}
};
factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming);
- PubsubUnboundedSource<String> source =
- new PubsubUnboundedSource<>(
+ PubsubUnboundedSource source =
+ new PubsubUnboundedSource(
clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION),
- StringUtf8Coder.of(), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, null);
- primSource = new PubsubSource<>(source);
+ TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, true /* needsAttributes */);
+ primSource = new PubsubSource(source);
}
private void setupOneMessage() {
@@ -114,6 +114,10 @@ public class PubsubUnboundedSourceTest {
factory = null;
}
+ private static String data(PubsubIO.PubsubMessage message) {
+ return new String(message.getMessage(), StandardCharsets.UTF_8);
+ }
+
@Test
public void checkpointCoderIsSane() throws Exception {
setupOneMessage(ImmutableList.<IncomingMessage>of());
@@ -126,13 +130,13 @@ public class PubsubUnboundedSourceTest {
@Test
public void readOneMessage() throws IOException {
setupOneMessage();
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubReader reader = primSource.createReader(p.getOptions(), null);
// Read one message.
assertTrue(reader.start());
- assertEquals(DATA, reader.getCurrent());
+ assertEquals(DATA, data(reader.getCurrent()));
assertFalse(reader.advance());
// ACK the message.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ PubsubCheckpoint checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@@ -140,19 +144,19 @@ public class PubsubUnboundedSourceTest {
@Test
public void timeoutAckAndRereadOneMessage() throws IOException {
setupOneMessage();
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubReader reader = primSource.createReader(p.getOptions(), null);
PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
assertTrue(reader.start());
- assertEquals(DATA, reader.getCurrent());
+ assertEquals(DATA, data(reader.getCurrent()));
// Let the ACK deadline for the above expire.
now.addAndGet(65 * 1000);
pubsubClient.advance();
// We'll now receive the same message again.
assertTrue(reader.advance());
- assertEquals(DATA, reader.getCurrent());
+ assertEquals(DATA, data(reader.getCurrent()));
assertFalse(reader.advance());
// Now ACK the message.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ PubsubCheckpoint checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@@ -160,11 +164,11 @@ public class PubsubUnboundedSourceTest {
@Test
public void extendAck() throws IOException {
setupOneMessage();
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubReader reader = primSource.createReader(p.getOptions(), null);
PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
// Pull the first message but don't take a checkpoint for it.
assertTrue(reader.start());
- assertEquals(DATA, reader.getCurrent());
+ assertEquals(DATA, data(reader.getCurrent()));
// Extend the ack
now.addAndGet(55 * 1000);
pubsubClient.advance();
@@ -174,7 +178,7 @@ public class PubsubUnboundedSourceTest {
pubsubClient.advance();
assertFalse(reader.advance());
// Now ACK the message.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ PubsubCheckpoint checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@@ -182,11 +186,11 @@ public class PubsubUnboundedSourceTest {
@Test
public void timeoutAckExtensions() throws IOException {
setupOneMessage();
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubReader reader = primSource.createReader(p.getOptions(), null);
PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
// Pull the first message but don't take a checkpoint for it.
assertTrue(reader.start());
- assertEquals(DATA, reader.getCurrent());
+ assertEquals(DATA, data(reader.getCurrent()));
// Extend the ack.
now.addAndGet(55 * 1000);
pubsubClient.advance();
@@ -202,9 +206,9 @@ public class PubsubUnboundedSourceTest {
pubsubClient.advance();
// Reread the same message.
assertTrue(reader.advance());
- assertEquals(DATA, reader.getCurrent());
+ assertEquals(DATA, data(reader.getCurrent()));
// Now ACK the message.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ PubsubCheckpoint checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@@ -218,20 +222,20 @@ public class PubsubUnboundedSourceTest {
incoming.add(new IncomingMessage(data.getBytes(), null, TIMESTAMP, 0, ackid, RECORD_ID));
}
setupOneMessage(incoming);
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubReader reader = primSource.createReader(p.getOptions(), null);
// Consume two messages, only read one.
assertTrue(reader.start());
- assertEquals("data_0", reader.getCurrent());
+ assertEquals("data_0", data(reader.getCurrent()));
// Grab checkpoint.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ PubsubCheckpoint checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
assertEquals(1, checkpoint.notYetReadIds.size());
assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
// Read second message.
assertTrue(reader.advance());
- assertEquals("data_1", reader.getCurrent());
+ assertEquals("data_1", data(reader.getCurrent()));
// Restore from checkpoint.
byte[] checkpointBytes =
@@ -244,7 +248,7 @@ public class PubsubUnboundedSourceTest {
// Re-read second message.
reader = primSource.createReader(p.getOptions(), checkpoint);
assertTrue(reader.start());
- assertEquals("data_1", reader.getCurrent());
+ assertEquals("data_1", data(reader.getCurrent()));
// We are done.
assertFalse(reader.advance());
@@ -278,7 +282,7 @@ public class PubsubUnboundedSourceTest {
}
setupOneMessage(incoming);
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubReader reader = primSource.createReader(p.getOptions(), null);
PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
for (int i = 0; i < n; i++) {
@@ -290,7 +294,7 @@ public class PubsubUnboundedSourceTest {
// We'll checkpoint and ack within the 2min limit.
now.addAndGet(30);
pubsubClient.advance();
- String data = reader.getCurrent();
+ String data = data(reader.getCurrent());
Integer messageNum = dataToMessageNum.remove(data);
// No duplicate messages.
assertNotNull(messageNum);
@@ -310,7 +314,7 @@ public class PubsubUnboundedSourceTest {
}
assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp));
// Ack messages, but only every other finalization.
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ PubsubCheckpoint checkpoint = reader.getCheckpointMark();
if (i % 2000 == 1999) {
checkpoint.finalizeCheckpoint();
}
@@ -327,26 +331,25 @@ public class PubsubUnboundedSourceTest {
public void noSubscriptionSplitGeneratesSubscription() throws Exception {
TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
factory = PubsubTestClient.createFactoryForCreateSubscription();
- PubsubUnboundedSource<String> source =
- new PubsubUnboundedSource<>(
+ PubsubUnboundedSource source =
+ new PubsubUnboundedSource(
factory,
StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
StaticValueProvider.of(topicPath),
- null,
- StringUtf8Coder.of(),
- null,
- null,
- null);
+ null /* subscription */,
+ null /* timestampLabel */,
+ null /* idLabel */,
+ false /* needsAttributes */);
assertThat(source.getSubscription(), nullValue());
assertThat(source.getSubscription(), nullValue());
PipelineOptions options = PipelineOptionsFactory.create();
- List<PubsubSource<String>> splits =
- (new PubsubSource<>(source)).split(3, options);
+ List<PubsubSource> splits =
+ (new PubsubSource(source)).split(3, options);
// We have at least one returned split
assertThat(splits, hasSize(greaterThan(0)));
- for (PubsubSource<String> split : splits) {
+ for (PubsubSource split : splits) {
// Each split is equal
assertThat(split, equalTo(splits.get(0)));
}
@@ -358,37 +361,36 @@ public class PubsubUnboundedSourceTest {
public void noSubscriptionNoSplitGeneratesSubscription() throws Exception {
TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
factory = PubsubTestClient.createFactoryForCreateSubscription();
- PubsubUnboundedSource<String> source =
- new PubsubUnboundedSource<>(
+ PubsubUnboundedSource source =
+ new PubsubUnboundedSource(
factory,
StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
StaticValueProvider.of(topicPath),
- null,
- StringUtf8Coder.of(),
- null,
- null,
- null);
+ null /* subscription */,
+ null /* timestampLabel */,
+ null /* idLabel */,
+ false /* needsAttributes */);
assertThat(source.getSubscription(), nullValue());
assertThat(source.getSubscription(), nullValue());
PipelineOptions options = PipelineOptionsFactory.create();
- PubsubSource<String> actualSource = new PubsubSource<>(source);
- PubsubReader<String> reader = actualSource.createReader(options, null);
+ PubsubSource actualSource = new PubsubSource(source);
+ PubsubReader reader = actualSource.createReader(options, null);
SubscriptionPath createdSubscription = reader.subscription;
assertThat(createdSubscription, not(nullValue()));
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ PubsubCheckpoint checkpoint = reader.getCheckpointMark();
assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath()));
checkpoint.finalizeCheckpoint();
- PubsubCheckpoint<String> deserCheckpoint =
+ PubsubCheckpoint deserCheckpoint =
CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint);
assertThat(checkpoint.subscriptionPath, not(nullValue()));
assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath));
- PubsubReader<String> readerFromOriginal = actualSource.createReader(options, checkpoint);
- PubsubReader<String> readerFromDeser = actualSource.createReader(options, deserCheckpoint);
+ PubsubReader readerFromOriginal = actualSource.createReader(options, checkpoint);
+ PubsubReader readerFromDeser = actualSource.createReader(options, deserCheckpoint);
assertThat(readerFromOriginal.subscription, equalTo(createdSubscription));
assertThat(readerFromDeser.subscription, equalTo(createdSubscription));
@@ -400,9 +402,9 @@ public class PubsubUnboundedSourceTest {
@Test
public void closeWithActiveCheckpoints() throws Exception {
setupOneMessage();
- PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+ PubsubReader reader = primSource.createReader(p.getOptions(), null);
reader.start();
- PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+ PubsubCheckpoint checkpoint = reader.getCheckpointMark();
reader.close();
checkpoint.finalizeCheckpoint();
}