You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/20 01:25:06 UTC
incubator-beam git commit: PubsubIO: integrate the new
PubsubUnboundedSource and Sink
Repository: incubator-beam
Updated Branches:
refs/heads/master 662e49351 -> 26941f152
PubsubIO: integrate the new PubsubUnboundedSource and Sink
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/26941f15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/26941f15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/26941f15
Branch: refs/heads/master
Commit: 26941f152cb5bed422ff14ccb10403604a611130
Parents: 662e493
Author: Mark Shields <ma...@google.com>
Authored: Mon Apr 11 17:36:27 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 19 18:24:56 2016 -0700
----------------------------------------------------------------------
.../dataflow/DataflowPipelineRunner.java | 249 ++++++++++++++++---
.../dataflow/DataflowPipelineTranslator.java | 8 -
.../dataflow/internal/PubsubIOTranslator.java | 108 --------
.../dataflow/io/DataflowPubsubIOTest.java | 13 +-
.../java/org/apache/beam/sdk/io/PubsubIO.java | 102 +++++---
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 67 ++++-
.../beam/sdk/io/PubsubUnboundedSource.java | 131 ++++++++--
.../beam/sdk/util/PubsubApiaryClient.java | 20 +-
.../org/apache/beam/sdk/util/PubsubClient.java | 82 ++++--
.../apache/beam/sdk/util/PubsubGrpcClient.java | 19 +-
.../apache/beam/sdk/util/PubsubTestClient.java | 21 +-
.../beam/sdk/io/PubsubUnboundedSinkTest.java | 48 ++--
.../beam/sdk/io/PubsubUnboundedSourceTest.java | 8 +-
.../beam/sdk/util/PubsubApiaryClientTest.java | 12 +-
.../beam/sdk/util/PubsubGrpcClientTest.java | 12 +-
.../beam/sdk/util/PubsubTestClientTest.java | 12 +-
16 files changed, 604 insertions(+), 308 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
index 8801896..0c77191 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
@@ -33,7 +33,6 @@ import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
-import org.apache.beam.runners.dataflow.internal.PubsubIOTranslator;
import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -63,6 +62,8 @@ import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.BigQueryIO;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.PubsubUnboundedSink;
+import org.apache.beam.sdk.io.PubsubUnboundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.ShardNameTemplate;
import org.apache.beam.sdk.io.TextIO;
@@ -107,6 +108,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
@@ -177,6 +179,7 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import javax.annotation.Nullable;
/**
* A {@link PipelineRunner} that executes the operations in the
@@ -338,33 +341,46 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
this.pcollectionsRequiringIndexedFormat = new HashSet<>();
this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
+ ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder();
if (options.isStreaming()) {
- overrides = ImmutableMap.<Class<?>, Class<?>>builder()
- .put(Combine.GloballyAsSingletonView.class, StreamingCombineGloballyAsSingletonView.class)
- .put(Create.Values.class, StreamingCreate.class)
- .put(View.AsMap.class, StreamingViewAsMap.class)
- .put(View.AsMultimap.class, StreamingViewAsMultimap.class)
- .put(View.AsSingleton.class, StreamingViewAsSingleton.class)
- .put(View.AsList.class, StreamingViewAsList.class)
- .put(View.AsIterable.class, StreamingViewAsIterable.class)
- .put(Write.Bound.class, StreamingWrite.class)
- .put(PubsubIO.Write.Bound.class, StreamingPubsubIOWrite.class)
- .put(Read.Unbounded.class, StreamingUnboundedRead.class)
- .put(Read.Bounded.class, UnsupportedIO.class)
- .put(AvroIO.Read.Bound.class, UnsupportedIO.class)
- .put(AvroIO.Write.Bound.class, UnsupportedIO.class)
- .put(BigQueryIO.Read.Bound.class, UnsupportedIO.class)
- .put(TextIO.Read.Bound.class, UnsupportedIO.class)
- .put(TextIO.Write.Bound.class, UnsupportedIO.class)
- .put(Window.Bound.class, AssignWindows.class)
- .build();
+ builder.put(Combine.GloballyAsSingletonView.class,
+ StreamingCombineGloballyAsSingletonView.class);
+ builder.put(Create.Values.class, StreamingCreate.class);
+ builder.put(View.AsMap.class, StreamingViewAsMap.class);
+ builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
+ builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
+ builder.put(View.AsList.class, StreamingViewAsList.class);
+ builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
+ builder.put(Write.Bound.class, StreamingWrite.class);
+ builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
+ builder.put(Read.Bounded.class, UnsupportedIO.class);
+ builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class);
+ builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
+ builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class);
+ builder.put(TextIO.Read.Bound.class, UnsupportedIO.class);
+ builder.put(TextIO.Write.Bound.class, UnsupportedIO.class);
+ builder.put(Window.Bound.class, AssignWindows.class);
+ // In streaming mode must use either the custom Pubsub unbounded source/sink or
+ // defer to Windmill's built-in implementation.
+ builder.put(PubsubIO.Read.Bound.PubsubBoundedReader.class, UnsupportedIO.class);
+ builder.put(PubsubIO.Write.Bound.PubsubBoundedWriter.class, UnsupportedIO.class);
+ if (options.getExperiments() == null
+ || !options.getExperiments().contains("enable_custom_pubsub_source")) {
+ builder.put(PubsubUnboundedSource.class, StreamingPubsubIORead.class);
+ }
+ if (options.getExperiments() == null
+ || !options.getExperiments().contains("enable_custom_pubsub_sink")) {
+ builder.put(PubsubUnboundedSink.class, StreamingPubsubIOWrite.class);
+ }
} else {
- ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder();
builder.put(Read.Unbounded.class, UnsupportedIO.class);
builder.put(Window.Bound.class, AssignWindows.class);
builder.put(Write.Bound.class, BatchWrite.class);
builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class);
builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class);
+ // In batch mode must use the custom Pubsub bounded source/sink.
+ builder.put(PubsubUnboundedSource.class, UnsupportedIO.class);
+ builder.put(PubsubUnboundedSink.class, UnsupportedIO.class);
if (options.getExperiments() == null
|| !options.getExperiments().contains("disable_ism_side_input")) {
builder.put(View.AsMap.class, BatchViewAsMap.class);
@@ -373,8 +389,8 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
builder.put(View.AsList.class, BatchViewAsList.class);
builder.put(View.AsIterable.class, BatchViewAsIterable.class);
}
- overrides = builder.build();
}
+ overrides = builder.build();
}
/**
@@ -2336,27 +2352,104 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
}
}
+ // ================================================================================
+ // PubsubIO translations
+ // ================================================================================
+
/**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.io.PubsubIO.Write PubsubIO.Write} for the
- * Dataflow runner in streaming mode.
- *
- * <p>For internal use only. Subject to change at any time.
- *
- * <p>Public so the {@link PubsubIOTranslator} can access.
+ * Suppress application of {@link PubsubUnboundedSource#apply} in streaming mode so that we
+ * can instead defer to Windmill's implementation.
*/
- public static class StreamingPubsubIOWrite<T> extends PTransform<PCollection<T>, PDone> {
- private final PubsubIO.Write.Bound<T> transform;
+ private static class StreamingPubsubIORead<T> extends PTransform<PBegin, PCollection<T>> {
+ private final PubsubUnboundedSource<T> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ public StreamingPubsubIORead(
+ DataflowPipelineRunner runner, PubsubUnboundedSource<T> transform) {
+ this.transform = transform;
+ }
+
+ PubsubUnboundedSource<T> getOverriddenTransform() {
+ return transform;
+ }
+
+ @Override
+ public PCollection<T> apply(PBegin input) {
+ return PCollection.<T>createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+ .setCoder(transform.getElementCoder());
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingPubsubIORead";
+ }
+
+ static {
+ DataflowPipelineTranslator.registerTransformTranslator(
+ StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator());
+ }
+ }
+
+ /**
+ * Rewrite {@link StreamingPubsubIORead} to the appropriate internal node.
+ */
+ private static class StreamingPubsubIOReadTranslator implements
+ TransformTranslator<StreamingPubsubIORead> {
+ @Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void translate(
+ StreamingPubsubIORead transform,
+ TranslationContext context) {
+ translateTyped(transform, context);
+ }
+
+ private <T> void translateTyped(
+ StreamingPubsubIORead<T> transform,
+ TranslationContext context) {
+ checkArgument(context.getPipelineOptions().isStreaming(),
+ "StreamingPubsubIORead is only for streaming pipelines.");
+ PubsubUnboundedSource<T> overriddenTransform = transform.getOverriddenTransform();
+ context.addStep(transform, "ParallelRead");
+ context.addInput(PropertyNames.FORMAT, "pubsub");
+ if (overriddenTransform.getTopic() != null) {
+ context.addInput(PropertyNames.PUBSUB_TOPIC,
+ overriddenTransform.getTopic().getV1Beta1Path());
+ }
+ if (overriddenTransform.getSubscription() != null) {
+ context.addInput(
+ PropertyNames.PUBSUB_SUBSCRIPTION,
+ overriddenTransform.getSubscription().getV1Beta1Path());
+ }
+ if (overriddenTransform.getTimestampLabel() != null) {
+ context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
+ overriddenTransform.getTimestampLabel());
+ }
+ if (overriddenTransform.getIdLabel() != null) {
+ context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+ }
+ context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
+ }
+ }
+
+ /**
+ * Suppress application of {@link PubsubUnboundedSink#apply} in streaming mode so that we
+ * can instead defer to Windmill's implementation.
+ */
+ private static class StreamingPubsubIOWrite<T> extends PTransform<PCollection<T>, PDone> {
+ private final PubsubUnboundedSink<T> transform;
/**
* Builds an instance of this class from the overridden transform.
*/
public StreamingPubsubIOWrite(
- DataflowPipelineRunner runner, PubsubIO.Write.Bound<T> transform) {
+ DataflowPipelineRunner runner, PubsubUnboundedSink<T> transform) {
this.transform = transform;
}
- public PubsubIO.Write.Bound<T> getOverriddenTransform() {
+ PubsubUnboundedSink<T> getOverriddenTransform() {
return transform;
}
@@ -2369,8 +2462,51 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
protected String getKindString() {
return "StreamingPubsubIOWrite";
}
+
+ static {
+ DataflowPipelineTranslator.registerTransformTranslator(
+ StreamingPubsubIOWrite.class, new StreamingPubsubIOWriteTranslator());
+ }
+ }
+
+ /**
+ * Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal node.
+ */
+ private static class StreamingPubsubIOWriteTranslator implements
+ TransformTranslator<StreamingPubsubIOWrite> {
+
+ @Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void translate(
+ StreamingPubsubIOWrite transform,
+ TranslationContext context) {
+ translateTyped(transform, context);
+ }
+
+ private <T> void translateTyped(
+ StreamingPubsubIOWrite<T> transform,
+ TranslationContext context) {
+ checkArgument(context.getPipelineOptions().isStreaming(),
+ "StreamingPubsubIOWrite is only for streaming pipelines.");
+ PubsubUnboundedSink<T> overriddenTransform = transform.getOverriddenTransform();
+ context.addStep(transform, "ParallelWrite");
+ context.addInput(PropertyNames.FORMAT, "pubsub");
+ context.addInput(PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path());
+ if (overriddenTransform.getTimestampLabel() != null) {
+ context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
+ overriddenTransform.getTimestampLabel());
+ }
+ if (overriddenTransform.getIdLabel() != null) {
+ context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+ }
+ context.addEncodingInput(
+ WindowedValue.getValueOnlyCoder(overriddenTransform.getElementCoder()));
+ context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+ }
}
+ // ================================================================================
+
/**
* Specialized implementation for
* {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the
@@ -2912,11 +3048,14 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
}
/**
- * Specialized expansion for unsupported IO transforms that throws an error.
+ * Specialized expansion for unsupported IO transforms and DoFns that throws an error.
*/
private static class UnsupportedIO<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
+ @Nullable
private PTransform<?, ?> transform;
+ @Nullable
+ private DoFn<?, ?> doFn;
/**
* Builds an instance of this class from the overridden transform.
@@ -2974,13 +3113,51 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
this.transform = transform;
}
+ /**
+ * Builds an instance of this class from the overridden doFn.
+ */
+ @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
+ public UnsupportedIO(DataflowPipelineRunner runner,
+ PubsubIO.Read.Bound<?>.PubsubBoundedReader doFn) {
+ this.doFn = doFn;
+ }
+
+ /**
+ * Builds an instance of this class from the overridden doFn.
+ */
+ @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
+ public UnsupportedIO(DataflowPipelineRunner runner,
+ PubsubIO.Write.Bound<?>.PubsubBoundedWriter doFn) {
+ this.doFn = doFn;
+ }
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
+ public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSource<?> transform) {
+ this.transform = transform;
+ }
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
+ public UnsupportedIO(DataflowPipelineRunner runner, PubsubUnboundedSink<?> transform) {
+ this.transform = transform;
+ }
+
+
@Override
public OutputT apply(InputT input) {
String mode = input.getPipeline().getOptions().as(StreamingOptions.class).isStreaming()
? "streaming" : "batch";
+ String name =
+ transform == null
+ ? approximateSimpleName(doFn.getClass())
+ : approximatePTransformName(transform.getClass());
throw new UnsupportedOperationException(
- String.format("The DataflowPipelineRunner in %s mode does not support %s.",
- mode, approximatePTransformName(transform.getClass())));
+ String.format("The DataflowPipelineRunner in %s mode does not support %s.", mode, name));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index d822803..7f67393 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -32,7 +32,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
-import org.apache.beam.runners.dataflow.internal.PubsubIOTranslator;
import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
@@ -41,7 +40,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.io.PubsubIO;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformTreeNode;
@@ -1009,12 +1007,6 @@ public class DataflowPipelineTranslator {
///////////////////////////////////////////////////////////////////////////
// IO Translation.
- registerTransformTranslator(
- PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator());
- registerTransformTranslator(
- DataflowPipelineRunner.StreamingPubsubIOWrite.class,
- new PubsubIOTranslator.WriteTranslator());
-
registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java
deleted file mode 100755
index 976f948..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/PubsubIOTranslator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
-import org.apache.beam.sdk.io.PubsubIO;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Pubsub transform support code for the Dataflow backend.
- */
-public class PubsubIOTranslator {
-
- /**
- * Implements PubsubIO Read translation for the Dataflow backend.
- */
- public static class ReadTranslator<T> implements TransformTranslator<PubsubIO.Read.Bound<T>> {
- @Override
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void translate(
- PubsubIO.Read.Bound transform,
- TranslationContext context) {
- translateReadHelper(transform, context);
- }
-
- private <T> void translateReadHelper(
- PubsubIO.Read.Bound<T> transform,
- TranslationContext context) {
- if (!context.getPipelineOptions().isStreaming()) {
- throw new IllegalArgumentException(
- "PubsubIO.Read can only be used with the Dataflow streaming runner.");
- }
-
- context.addStep(transform, "ParallelRead");
- context.addInput(PropertyNames.FORMAT, "pubsub");
- if (transform.getTopic() != null) {
- context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
- }
- if (transform.getSubscription() != null) {
- context.addInput(
- PropertyNames.PUBSUB_SUBSCRIPTION, transform.getSubscription().asV1Beta1Path());
- }
- if (transform.getTimestampLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
- }
- if (transform.getIdLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
- }
- context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- }
- }
-
- /**
- * Implements PubsubIO Write translation for the Dataflow backend.
- */
- public static class WriteTranslator<T>
- implements TransformTranslator<DataflowPipelineRunner.StreamingPubsubIOWrite<T>> {
-
- @Override
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void translate(
- DataflowPipelineRunner.StreamingPubsubIOWrite transform,
- TranslationContext context) {
- translateWriteHelper(transform, context);
- }
-
- private <T> void translateWriteHelper(
- DataflowPipelineRunner.StreamingPubsubIOWrite<T> customTransform,
- TranslationContext context) {
- if (!context.getPipelineOptions().isStreaming()) {
- throw new IllegalArgumentException(
- "PubsubIO.Write is non-primitive for the Dataflow batch runner.");
- }
-
- PubsubIO.Write.Bound<T> transform = customTransform.getOverriddenTransform();
-
- context.addStep(customTransform, "ParallelWrite");
- context.addInput(PropertyNames.FORMAT, "pubsub");
- context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
- if (transform.getTimestampLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
- }
- if (transform.getIdLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
- }
- context.addEncodingInput(WindowedValue.getValueOnlyCoder(transform.getCoder()));
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(customTransform));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
index 4874877..3df9cdb 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java
@@ -42,21 +42,22 @@ public class DataflowPubsubIOTest {
@Test
public void testPrimitiveWriteDisplayData() {
DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
- PubsubIO.Write.Bound<?> write = PubsubIO.Write
- .topic("projects/project/topics/topic");
+ PubsubIO.Write.Bound<?> write = PubsubIO.Write.topic("projects/project/topics/topic");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("PubsubIO.Write should include the topic in its primitive display data",
- displayData, hasItem(hasDisplayItem("topic")));
+ displayData, hasItem(hasDisplayItem("topic")));
}
@Test
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
- PubsubIO.Read.Bound<String> read = PubsubIO.Read.topic("projects/project/topics/topic");
+ PubsubIO.Read.Bound<String> read =
+ PubsubIO.Read.subscription("projects/project/subscriptions/subscription")
+ .maxNumRecords(1);
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
- assertThat("PubsubIO.Read should include the topic in its primitive display data",
- displayData, hasItem(hasDisplayItem("topic")));
+ assertThat("PubsubIO.Read should include the subscription in its primitive display data",
+ displayData, hasItem(hasDisplayItem("subscription")));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 78fec85..23a1140 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -36,10 +36,10 @@ import org.apache.beam.sdk.util.PubsubApiaryClient;
import org.apache.beam.sdk.util.PubsubClient;
import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
+import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
@@ -54,7 +54,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
@@ -634,12 +633,12 @@ public class PubsubIO {
@Override
public PCollection<T> apply(PInput input) {
if (topic == null && subscription == null) {
- throw new IllegalStateException("need to set either the topic or the subscription for "
+ throw new IllegalStateException("Need to set either the topic or the subscription for "
+ "a PubsubIO.Read transform");
}
if (topic != null && subscription != null) {
- throw new IllegalStateException("Can't set both the topic and the subscription for a "
- + "PubsubIO.Read transform");
+ throw new IllegalStateException("Can't set both the topic and the subscription for "
+ + "a PubsubIO.Read transform");
}
boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
@@ -649,9 +648,19 @@ public class PubsubIO {
.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
.apply(ParDo.of(new PubsubBoundedReader())).setCoder(coder);
} else {
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(coder);
+ @Nullable ProjectPath projectPath =
+ topic == null ? null : PubsubClient.projectPathFromId(topic.project);
+ @Nullable TopicPath topicPath =
+ topic == null ? null : PubsubClient.topicPathFromName(topic.project, topic.topic);
+ @Nullable SubscriptionPath subscriptionPath =
+ subscription == null
+ ? null
+ : PubsubClient
+ .subscriptionPathFromName(subscription.project, subscription.subscription);
+ return input.getPipeline().begin()
+ .apply(new PubsubUnboundedSource<T>(
+ FACTORY, projectPath, topicPath, subscriptionPath,
+ coder, timestampLabel, idLabel));
}
}
@@ -707,12 +716,16 @@ public class PubsubIO {
/**
* Default reader when Pubsub subscription has some form of upper bound.
- * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top of upcoming
- * PubsubUnboundedSource.
- * <p>NOTE: This is not the implementation used when running on the Google Dataflow hosted
- * service.
+ *
+ * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top
+ * of PubsubUnboundedSource.
+ *
+ * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
+ * service in streaming mode.
+ *
+ * <p>Public so can be suppressed by runners.
*/
- private class PubsubBoundedReader extends DoFn<Void, T> {
+ public class PubsubBoundedReader extends DoFn<Void, T> {
private static final int DEFAULT_PULL_SIZE = 100;
private static final int ACK_TIMEOUT_SEC = 60;
@@ -724,20 +737,20 @@ public class PubsubIO {
PubsubClient.SubscriptionPath subscriptionPath;
if (getSubscription() == null) {
- // Create a randomized subscription derived from the topic name.
- String subscription = getTopic().topic + "_dataflow_" + new Random().nextLong();
+ TopicPath topicPath =
+ PubsubClient.topicPathFromName(getTopic().project, getTopic().topic);
// The subscription will be registered under this pipeline's project if we know it.
// Otherwise we'll fall back to the topic's project.
// Note that they don't need to be the same.
- String project = c.getPipelineOptions().as(PubsubOptions.class).getProject();
- if (Strings.isNullOrEmpty(project)) {
- project = getTopic().project;
+ String projectId =
+ c.getPipelineOptions().as(PubsubOptions.class).getProject();
+ if (Strings.isNullOrEmpty(projectId)) {
+ projectId = getTopic().project;
}
- subscriptionPath = PubsubClient.subscriptionPathFromName(project, subscription);
- TopicPath topicPath =
- PubsubClient.topicPathFromName(getTopic().project, getTopic().topic);
+ ProjectPath projectPath = PubsubClient.projectPathFromId(projectId);
try {
- pubsubClient.createSubscription(topicPath, subscriptionPath, ACK_TIMEOUT_SEC);
+ subscriptionPath =
+ pubsubClient.createRandomSubscription(projectPath, topicPath, ACK_TIMEOUT_SEC);
} catch (Exception e) {
throw new RuntimeException("Failed to create subscription: ", e);
}
@@ -795,6 +808,12 @@ public class PubsubIO {
}
}
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ Bound.this.populateDisplayData(builder);
+ }
}
}
@@ -961,8 +980,20 @@ public class PubsubIO {
if (topic == null) {
throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
}
- input.apply(ParDo.of(new PubsubWriter()));
- return PDone.in(input.getPipeline());
+ switch (input.isBounded()) {
+ case BOUNDED:
+ input.apply(ParDo.of(new PubsubBoundedWriter()));
+ return PDone.in(input.getPipeline());
+ case UNBOUNDED:
+ return input.apply(new PubsubUnboundedSink<T>(
+ FACTORY,
+ PubsubClient.topicPathFromName(topic.project, topic.topic),
+ coder,
+ timestampLabel,
+ idLabel,
+ 100 /* numShards */));
+ }
+ throw new RuntimeException(); // cases are exhaustive.
}
@Override
@@ -993,11 +1024,14 @@ public class PubsubIO {
}
/**
- * Writer to Pubsub which batches messages.
- * <p>NOTE: This is not the implementation used when running on the Google Dataflow hosted
- * service.
+ * Writer to Pubsub which batches messages from bounded collections.
+ *
+ * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
+ * service in streaming mode.
+ *
+ * <p>Public so can be suppressed by runners.
*/
- private class PubsubWriter extends DoFn<T, Void> {
+ public class PubsubBoundedWriter extends DoFn<T, Void> {
private static final int MAX_PUBLISH_BATCH_SIZE = 100;
private transient List<OutgoingMessage> output;
private transient PubsubClient pubsubClient;
@@ -1005,15 +1039,18 @@ public class PubsubIO {
@Override
public void startBundle(Context c) throws IOException {
this.output = new ArrayList<>();
- this.pubsubClient = FACTORY.newClient(timestampLabel, idLabel,
- c.getPipelineOptions().as(PubsubOptions.class));
+ // NOTE: idLabel is ignored.
+ this.pubsubClient =
+ FACTORY.newClient(timestampLabel, null,
+ c.getPipelineOptions().as(PubsubOptions.class));
}
@Override
public void processElement(ProcessContext c) throws IOException {
+ // NOTE: The record id is always null.
OutgoingMessage message =
new OutgoingMessage(CoderUtils.encodeToByteArray(getCoder(), c.element()),
- c.timestamp().getMillis());
+ c.timestamp().getMillis(), null);
output.add(message);
if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
@@ -1041,6 +1078,7 @@ public class PubsubIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
Bound.this.populateDisplayData(builder);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 7ca2b57..6ff9b40 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.transforms.Aggregator;
@@ -52,6 +54,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.hash.Hashing;
import org.joda.time.Duration;
import org.slf4j.Logger;
@@ -62,6 +65,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
@@ -81,6 +85,8 @@ import javax.annotation.Nullable;
* <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
* to dedup messages.
* </ul>
+ *
+ * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow service.
*/
public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class);
@@ -104,12 +110,16 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
* Coder for conveying outgoing messages between internal stages.
*/
private static class OutgoingMessageCoder extends CustomCoder<OutgoingMessage> {
+ private static final NullableCoder<String> RECORD_ID_CODER =
+ NullableCoder.of(StringUtf8Coder.of());
+
@Override
public void encode(
OutgoingMessage value, OutputStream outStream, Context context)
throws CoderException, IOException {
ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED);
BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED);
+ RECORD_ID_CODER.encode(value.recordId, outStream, Context.NESTED);
}
@Override
@@ -117,7 +127,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
InputStream inStream, Context context) throws CoderException, IOException {
byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED);
long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED);
- return new OutgoingMessage(elementBytes, timestampMsSinceEpoch);
+ @Nullable String recordId = RECORD_ID_CODER.decode(inStream, Context.NESTED);
+ return new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId);
}
}
@@ -125,6 +136,23 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
static final Coder<OutgoingMessage> CODER = new OutgoingMessageCoder();
// ================================================================================
+ // RecordIdMethod
+ // ================================================================================
+
+ /**
+ * Specify how record ids are to be generated.
+ */
+ @VisibleForTesting
+ enum RecordIdMethod {
+ /** Leave null. */
+ NONE,
+ /** Generate randomly. */
+ RANDOM,
+ /** Generate deterministically. For testing only. */
+ DETERMINISTIC
+ }
+
+ // ================================================================================
// ShardFn
// ================================================================================
@@ -136,10 +164,12 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
createAggregator("elements", new Sum.SumLongFn());
private final Coder<T> elementCoder;
private final int numShards;
+ private final RecordIdMethod recordIdMethod;
- ShardFn(Coder<T> elementCoder, int numShards) {
+ ShardFn(Coder<T> elementCoder, int numShards, RecordIdMethod recordIdMethod) {
this.elementCoder = elementCoder;
this.numShards = numShards;
+ this.recordIdMethod = recordIdMethod;
}
@Override
@@ -147,9 +177,23 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
elementCounter.addValue(1L);
byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
long timestampMsSinceEpoch = c.timestamp().getMillis();
- // TODO: A random record id should be assigned here.
+ @Nullable String recordId = null;
+ switch (recordIdMethod) {
+ case NONE:
+ break;
+ case DETERMINISTIC:
+ recordId = Hashing.murmur3_128().hashBytes(elementBytes).toString();
+ break;
+ case RANDOM:
+ // Since these elements go through a GroupByKey, any failures while sending to
+ // Pubsub will be retried without falling back and generating a new record id.
+ // Thus even though we may send the same message to Pubsub twice, it is guaranteed
+ // to have the same record id.
+ recordId = UUID.randomUUID().toString();
+ break;
+ }
c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards),
- new OutgoingMessage(elementBytes, timestampMsSinceEpoch)));
+ new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId)));
}
@Override
@@ -319,6 +363,12 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
*/
private final Duration maxLatency;
+ /**
+ * How record ids should be generated for each record (if {@link #idLabel} is non-{@literal
+ * null}).
+ */
+ private final RecordIdMethod recordIdMethod;
+
@VisibleForTesting
PubsubUnboundedSink(
PubsubClientFactory pubsubFactory,
@@ -329,7 +379,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
int numShards,
int publishBatchSize,
int publishBatchBytes,
- Duration maxLatency) {
+ Duration maxLatency,
+ RecordIdMethod recordIdMethod) {
this.pubsubFactory = pubsubFactory;
this.topic = topic;
this.elementCoder = elementCoder;
@@ -339,6 +390,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
this.maxLatency = maxLatency;
+ this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod;
}
public PubsubUnboundedSink(
@@ -349,7 +401,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
String idLabel,
int numShards) {
this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards,
- DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY);
+ DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY,
+ RecordIdMethod.RANDOM);
}
public TopicPath getTopic() {
@@ -382,7 +435,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
.plusDelayOf(maxLatency))))
.discardingFiredPanes())
.apply(ParDo.named("PubsubUnboundedSink.Shard")
- .of(new ShardFn<T>(elementCoder, numShards)))
+ .of(new ShardFn<T>(elementCoder, numShards, recordIdMethod)))
.setCoder(KvCoder.of(VarIntCoder.of(), CODER))
.apply(GroupByKey.<Integer, OutgoingMessage>create())
.apply(ParDo.named("PubsubUnboundedSink.Writer")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index d635a8a..0492c76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@@ -42,13 +43,16 @@ import org.apache.beam.sdk.util.BucketingFunction;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory;
import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.util.PubsubClient.TopicPath;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import com.google.api.client.util.Clock;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -102,11 +106,18 @@ import javax.annotation.Nullable;
* are blocking. We rely on the underlying runner to allow multiple
* {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency.
* </ul>
+ *
+ * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow service.
*/
public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> {
private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
/**
+ * Default ACK timeout for created subscriptions.
+ */
+ private static final int DEAULT_ACK_TIMEOUT_SEC = 60;
+
+ /**
* Coder for checkpoints.
*/
private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = new PubsubCheckpointCoder<>();
@@ -292,6 +303,17 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
/**
+ * Return current time according to {@code reader}.
+ */
+ private static long now(PubsubReader reader) {
+ if (reader.outer.outer.clock == null) {
+ return System.currentTimeMillis();
+ } else {
+ return reader.outer.outer.clock.currentTimeMillis();
+ }
+ }
+
+ /**
* BLOCKING
* NACK all messages which have been read from Pubsub but not passed downstream.
* This way Pubsub will send them again promptly.
@@ -303,13 +325,13 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
for (String ackId : notYetReadIds) {
batchYetToAckIds.add(ackId);
if (batchYetToAckIds.size() >= ACK_BATCH_SIZE) {
- long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis();
+ long nowMsSinceEpoch = now(reader);
reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
batchYetToAckIds.clear();
}
}
if (!batchYetToAckIds.isEmpty()) {
- long nowMsSinceEpoch = reader.outer.outer.clock.currentTimeMillis();
+ long nowMsSinceEpoch = now(reader);
reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
}
}
@@ -614,7 +636,11 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
* Return the current time, in ms since epoch.
*/
private long now() {
- return outer.outer.clock.currentTimeMillis();
+ if (outer.outer.clock == null) {
+ return System.currentTimeMillis();
+ } else {
+ return outer.outer.clock.currentTimeMillis();
+ }
}
/**
@@ -928,7 +954,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
if (current == null) {
throw new NoSuchElementException();
}
- return current.recordId;
+ return current.recordId.getBytes(Charsets.UTF_8);
}
@Override
@@ -1124,8 +1150,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
// ================================================================================
/**
- * Clock to use for all timekeeping.
+ * For testing only: Clock to use for all timekeeping. If {@literal null} use system clock.
*/
+ @Nullable
private Clock clock;
/**
@@ -1134,9 +1161,28 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
private final PubsubClientFactory pubsubFactory;
/**
- * Subscription to read from.
+ * Project under which to create a subscription if only the {@link #topic} was given.
+ */
+ @Nullable
+ private final ProjectPath project;
+
+ /**
+ * Topic to read from. If {@literal null}, then {@link #subscription} must be given.
+ * Otherwise {@link #subscription} must be null.
*/
- private final SubscriptionPath subscription;
+ @Nullable
+ private final TopicPath topic;
+
+ /**
+ * Subscription to read from. If {@literal null} then {@link #topic} must be given.
+ * Otherwise {@link #topic} must be null.
+ *
+ * <p>If no subscription is given a random one will be created when the transorm is
+ * applied. This field will be update with that subscription's path. The created
+ * subscription is never deleted.
+ */
+ @Nullable
+ private SubscriptionPath subscription;
/**
* Coder for elements. Elements are effectively double-encoded: first to a byte array
@@ -1159,25 +1205,60 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
@Nullable
private final String idLabel;
- /**
- * Construct an unbounded source to consume from the Pubsub {@code subscription}.
- */
- public PubsubUnboundedSource(
+ @VisibleForTesting
+ PubsubUnboundedSource(
Clock clock,
PubsubClientFactory pubsubFactory,
- SubscriptionPath subscription,
+ @Nullable ProjectPath project,
+ @Nullable TopicPath topic,
+ @Nullable SubscriptionPath subscription,
Coder<T> elementCoder,
@Nullable String timestampLabel,
@Nullable String idLabel) {
+ checkArgument((topic == null) != (subscription == null),
+ "Exactly one of topic and subscription must be given");
+ checkArgument((topic == null) == (project == null),
+ "Project must be given if topic is given");
this.clock = clock;
this.pubsubFactory = checkNotNull(pubsubFactory);
- this.subscription = checkNotNull(subscription);
+ this.project = project;
+ this.topic = topic;
+ this.subscription = subscription;
this.elementCoder = checkNotNull(elementCoder);
this.timestampLabel = timestampLabel;
this.idLabel = idLabel;
}
- public PubsubClient.SubscriptionPath getSubscription() {
+ /**
+ * Construct an unbounded source to consume from the Pubsub {@code subscription}.
+ */
+ public PubsubUnboundedSource(
+ PubsubClientFactory pubsubFactory,
+ @Nullable ProjectPath project,
+ @Nullable TopicPath topic,
+ @Nullable SubscriptionPath subscription,
+ Coder<T> elementCoder,
+ @Nullable String timestampLabel,
+ @Nullable String idLabel) {
+ this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel);
+ }
+
+ public Coder<T> getElementCoder() {
+ return elementCoder;
+ }
+
+ @Nullable
+ public ProjectPath getProject() {
+ return project;
+ }
+
+ @Nullable
+ public TopicPath getTopic() {
+ return topic;
+ }
+
+ @Nullable
+ public SubscriptionPath getSubscription() {
return subscription;
}
@@ -1191,12 +1272,26 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
return idLabel;
}
- public Coder<T> getElementCoder() {
- return elementCoder;
- }
-
@Override
public PCollection<T> apply(PBegin input) {
+ if (subscription == null) {
+ try {
+ try (PubsubClient pubsubClient =
+ pubsubFactory.newClient(timestampLabel, idLabel,
+ input.getPipeline()
+ .getOptions()
+ .as(PubsubOptions.class))) {
+ subscription =
+ pubsubClient.createRandomSubscription(project, topic, DEAULT_ACK_TIMEOUT_SEC);
+ LOG.warn("Created subscription {} to topic {}."
+ + " Note this subscription WILL NOT be deleted when the pipeline terminates",
+ subscription, topic);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create subscription: ", e);
+ }
+ }
+
return input.getPipeline().begin()
.apply(Read.from(new PubsubSource<T>(this)))
.apply(ParDo.named("PubsubUnboundedSource.Stats")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
index aa73d42..08981d0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java
@@ -40,7 +40,6 @@ import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
import java.io.IOException;
import java.util.ArrayList;
@@ -135,11 +134,8 @@ public class PubsubApiaryClient extends PubsubClient {
attributes.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
}
- if (idLabel != null) {
- // TODO: The id should be associated with the OutgoingMessage so that it is stable
- // across retried bundles
- attributes.put(idLabel,
- Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
+ if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ attributes.put(idLabel, outgoingMessage.recordId);
}
pubsubMessages.add(pubsubMessage);
@@ -185,15 +181,13 @@ public class PubsubApiaryClient extends PubsubClient {
checkState(!Strings.isNullOrEmpty(ackId));
// Record id, if any.
- @Nullable byte[] recordId = null;
+ @Nullable String recordId = null;
if (idLabel != null && attributes != null) {
- String recordIdString = attributes.get(idLabel);
- if (!Strings.isNullOrEmpty(recordIdString)) {
- recordId = recordIdString.getBytes();
- }
+ recordId = attributes.get(idLabel);
}
- if (recordId == null) {
- recordId = pubsubMessage.getMessageId().getBytes();
+ if (Strings.isNullOrEmpty(recordId)) {
+ // Fall back to the Pubsub provided message id.
+ recordId = pubsubMessage.getMessageId();
}
incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index dc4858e..07ce97d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -33,6 +33,7 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
/**
@@ -132,6 +133,12 @@ public abstract class PubsubClient implements Closeable {
return path;
}
+ public String getId() {
+ String[] splits = path.split("/");
+ checkState(splits.length == 1, "Malformed project path %s", path);
+ return splits[1];
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -180,6 +187,12 @@ public abstract class PubsubClient implements Closeable {
return path;
}
+ public String getName() {
+ String[] splits = path.split("/");
+ checkState(splits.length == 4, "Malformed subscription path %s", path);
+ return splits[3];
+ }
+
public String getV1Beta1Path() {
String[] splits = path.split("/");
checkState(splits.length == 4, "Malformed subscription path %s", path);
@@ -233,6 +246,12 @@ public abstract class PubsubClient implements Closeable {
return path;
}
+ public String getName() {
+ String[] splits = path.split("/");
+ checkState(splits.length == 4, "Malformed topic path %s", path);
+ return splits[3];
+ }
+
public String getV1Beta1Path() {
String[] splits = path.split("/");
checkState(splits.length == 4, "Malformed topic path %s", path);
@@ -286,11 +305,18 @@ public abstract class PubsubClient implements Closeable {
*/
public final long timestampMsSinceEpoch;
- // TODO: Support a record id.
+ /**
+ * If using an id label, the record id to associate with this record's metadata so the receiver
+ * can reject duplicates. Otherwise {@literal null}.
+ */
+ @Nullable
+ public final String recordId;
- public OutgoingMessage(byte[] elementBytes, long timestampMsSinceEpoch) {
+ public OutgoingMessage(
+ byte[] elementBytes, long timestampMsSinceEpoch, @Nullable String recordId) {
this.elementBytes = elementBytes;
this.timestampMsSinceEpoch = timestampMsSinceEpoch;
+ this.recordId = recordId;
}
@Override
@@ -310,16 +336,14 @@ public abstract class PubsubClient implements Closeable {
OutgoingMessage that = (OutgoingMessage) o;
- if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) {
- return false;
- }
- return Arrays.equals(elementBytes, that.elementBytes);
-
+ return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+ && Arrays.equals(elementBytes, that.elementBytes)
+ && Objects.equal(recordId, that.recordId);
}
@Override
public int hashCode() {
- return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch);
+ return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch, recordId);
}
}
@@ -353,14 +377,14 @@ public abstract class PubsubClient implements Closeable {
/**
* Id to pass to the runner to distinguish this message from all others.
*/
- public final byte[] recordId;
+ public final String recordId;
public IncomingMessage(
byte[] elementBytes,
long timestampMsSinceEpoch,
long requestTimeMsSinceEpoch,
String ackId,
- byte[] recordId) {
+ String recordId) {
this.elementBytes = elementBytes;
this.timestampMsSinceEpoch = timestampMsSinceEpoch;
this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
@@ -390,26 +414,18 @@ public abstract class PubsubClient implements Closeable {
IncomingMessage that = (IncomingMessage) o;
- if (timestampMsSinceEpoch != that.timestampMsSinceEpoch) {
- return false;
- }
- if (requestTimeMsSinceEpoch != that.requestTimeMsSinceEpoch) {
- return false;
- }
- if (!Arrays.equals(elementBytes, that.elementBytes)) {
- return false;
- }
- if (!ackId.equals(that.ackId)) {
- return false;
- }
- return Arrays.equals(recordId, that.recordId);
+ return timestampMsSinceEpoch == that.timestampMsSinceEpoch
+ && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
+ && ackId.equals(that.ackId)
+ && recordId.equals(that.recordId)
+ && Arrays.equals(elementBytes, that.elementBytes);
}
@Override
public int hashCode() {
return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch,
requestTimeMsSinceEpoch,
- ackId, Arrays.hashCode(recordId));
+ ackId, recordId);
}
}
@@ -485,6 +501,22 @@ public abstract class PubsubClient implements Closeable {
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
/**
+ * Create a random subscription for {@code topic}. Return the {@link SubscriptionPath}. It
+ * is the responsibility of the caller to later delete the subscription.
+ *
+ * @throws IOException
+ */
+ public SubscriptionPath createRandomSubscription(
+ ProjectPath project, TopicPath topic, int ackDeadlineSeconds) throws IOException {
+ // Create a randomized subscription derived from the topic name.
+ String subscriptionName = topic.getName() + "_beam_" + ThreadLocalRandom.current().nextLong();
+ SubscriptionPath subscription =
+ PubsubClient.subscriptionPathFromName(project.getId(), subscriptionName);
+ createSubscription(topic, subscription, ackDeadlineSeconds);
+ return subscription;
+ }
+
+ /**
* Delete {@code subscription}.
*
* @throws IOException
@@ -507,7 +539,7 @@ public abstract class PubsubClient implements Closeable {
public abstract int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException;
/**
- * Return {@literal true} if {@link pull} will always return empty list. Actual clients
+ * Return {@literal true} if {@link #pull} will always return empty list. Actual clients
* will return {@literal false}. Test clients may return {@literal true} to signal that all
* expected messages have been pulled and the test may complete.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
index e759513..ac157fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
@@ -27,7 +27,6 @@ import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.AcknowledgeRequest;
@@ -257,10 +256,8 @@ public class PubsubGrpcClient extends PubsubClient {
.put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
}
- if (idLabel != null) {
- message.getMutableAttributes()
- .put(idLabel,
- Hashing.murmur3_128().hashBytes(outgoingMessage.elementBytes).toString());
+ if (idLabel != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) {
+ message.getMutableAttributes().put(idLabel, outgoingMessage.recordId);
}
request.addMessages(message);
@@ -308,15 +305,13 @@ public class PubsubGrpcClient extends PubsubClient {
checkState(!Strings.isNullOrEmpty(ackId));
// Record id, if any.
- @Nullable byte[] recordId = null;
+ @Nullable String recordId = null;
if (idLabel != null && attributes != null) {
- String recordIdString = attributes.get(idLabel);
- if (recordIdString != null && !recordIdString.isEmpty()) {
- recordId = recordIdString.getBytes();
- }
+ recordId = attributes.get(idLabel);
}
- if (recordId == null) {
- recordId = pubsubMessage.getMessageId().getBytes();
+ if (Strings.isNullOrEmpty(recordId)) {
+ // Fall back to the Pubsub provided message id.
+ recordId = pubsubMessage.getMessageId();
}
incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
index c1dfa06..9fa0380 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -46,10 +46,9 @@ public class PubsubTestClient extends PubsubClient {
* Mimic the state of the simulated Pubsub 'service'.
*
* Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running
- * test
- * pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created from
- * the same client factory and run in parallel. Thus we can't enforce aliasing of the following
- * data structures over all clients and must resort to a static.
+ * test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created
+ * from the same client factory and run in parallel. Thus we can't enforce aliasing of the
+ * following data structures over all clients and must resort to a static.
*/
private static class State {
/**
@@ -70,6 +69,13 @@ public class PubsubTestClient extends PubsubClient {
Set<OutgoingMessage> remainingExpectedOutgoingMessages;
/**
+ * Publish mode only: Messages which should throw when first sent to simulate transient publish
+ * failure.
+ */
+ @Nullable
+ Set<OutgoingMessage> remainingFailingOutgoingMessages;
+
+ /**
* Pull mode only: Clock from which to get current time.
*/
@Nullable
@@ -119,11 +125,13 @@ public class PubsubTestClient extends PubsubClient {
*/
public static PubsubTestClientFactory createFactoryForPublish(
final TopicPath expectedTopic,
- final Iterable<OutgoingMessage> expectedOutgoingMessages) {
+ final Iterable<OutgoingMessage> expectedOutgoingMessages,
+ final Iterable<OutgoingMessage> failingOutgoingMessages) {
synchronized (STATE) {
checkState(!STATE.isActive, "Test still in flight");
STATE.expectedTopic = expectedTopic;
STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
+ STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
STATE.isActive = true;
}
return new PubsubTestClientFactory() {
@@ -257,6 +265,9 @@ public class PubsubTestClient extends PubsubClient {
checkState(topic.equals(STATE.expectedTopic), "Topic %s does not match expected %s", topic,
STATE.expectedTopic);
for (OutgoingMessage outgoingMessage : outgoingMessages) {
+ if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
+ throw new RuntimeException("Simulating failure for " + outgoingMessage);
+ }
checkState(STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
"Unexpected outgoing message %s", outgoingMessage);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index b4ef785..bf70e47 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -19,6 +19,7 @@
package org.apache.beam.sdk.io;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.PubsubUnboundedSink.RecordIdMethod;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -31,7 +32,7 @@ import org.apache.beam.sdk.util.PubsubTestClient;
import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -41,9 +42,7 @@ import org.junit.runners.JUnit4;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
/**
* Test PubsubUnboundedSink.
@@ -55,6 +54,7 @@ public class PubsubUnboundedSinkTest {
private static final long TIMESTAMP = 1234L;
private static final String TIMESTAMP_LABEL = "timestamp";
private static final String ID_LABEL = "id";
+ private static final int NUM_SHARDS = 10;
private static class Stamp extends DoFn<String, String> {
@Override
@@ -63,22 +63,30 @@ public class PubsubUnboundedSinkTest {
}
}
+ private String getRecordId(String data) {
+ return Hashing.murmur3_128().hashBytes(data.getBytes()).toString();
+ }
+
@Test
public void saneCoder() throws Exception {
- OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP);
+ OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP, getRecordId(DATA));
CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message);
CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
}
@Test
public void sendOneMessage() throws IOException {
- Set<OutgoingMessage> outgoing =
- Sets.newHashSet(new OutgoingMessage(DATA.getBytes(), TIMESTAMP));
+ List<OutgoingMessage> outgoing =
+ ImmutableList.of(new OutgoingMessage(DATA.getBytes(), TIMESTAMP, getRecordId(DATA)));
+ int batchSize = 1;
+ int batchBytes = 1;
try (PubsubTestClientFactory factory =
- PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+ ImmutableList.<OutgoingMessage>of())) {
PubsubUnboundedSink<String> sink =
new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
- 10);
+ NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
+ RecordIdMethod.DETERMINISTIC);
TestPipeline p = TestPipeline.create();
p.apply(Create.of(ImmutableList.of(DATA)))
.apply(ParDo.of(new Stamp()))
@@ -91,20 +99,22 @@ public class PubsubUnboundedSinkTest {
@Test
public void sendMoreThanOneBatchByNumMessages() throws IOException {
- Set<OutgoingMessage> outgoing = new HashSet<>();
+ List<OutgoingMessage> outgoing = new ArrayList<>();
List<String> data = new ArrayList<>();
int batchSize = 2;
int batchBytes = 1000;
for (int i = 0; i < batchSize * 10; i++) {
String str = String.valueOf(i);
- outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP));
+ outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP, getRecordId(str)));
data.add(str);
}
try (PubsubTestClientFactory factory =
- PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+ ImmutableList.<OutgoingMessage>of())) {
PubsubUnboundedSink<String> sink =
new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
- 10, batchSize, batchBytes, Duration.standardSeconds(2));
+ NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
+ RecordIdMethod.DETERMINISTIC);
TestPipeline p = TestPipeline.create();
p.apply(Create.of(data))
.apply(ParDo.of(new Stamp()))
@@ -117,7 +127,7 @@ public class PubsubUnboundedSinkTest {
@Test
public void sendMoreThanOneBatchByByteSize() throws IOException {
- Set<OutgoingMessage> outgoing = new HashSet<>();
+ List<OutgoingMessage> outgoing = new ArrayList<>();
List<String> data = new ArrayList<>();
int batchSize = 100;
int batchBytes = 10;
@@ -128,15 +138,17 @@ public class PubsubUnboundedSinkTest {
sb.append(String.valueOf(n));
}
String str = sb.toString();
- outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP));
+ outgoing.add(new OutgoingMessage(str.getBytes(), TIMESTAMP, getRecordId(str)));
data.add(str);
n += str.length();
}
try (PubsubTestClientFactory factory =
- PubsubTestClient.createFactoryForPublish(TOPIC, outgoing)) {
+ PubsubTestClient.createFactoryForPublish(TOPIC, outgoing,
+ ImmutableList.<OutgoingMessage>of())) {
PubsubUnboundedSink<String> sink =
new PubsubUnboundedSink<>(factory, TOPIC, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
- 10, batchSize, batchBytes, Duration.standardSeconds(2));
+ NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
+ RecordIdMethod.DETERMINISTIC);
TestPipeline p = TestPipeline.create();
p.apply(Create.of(data))
.apply(ParDo.of(new Stamp()))
@@ -146,4 +158,8 @@ public class PubsubUnboundedSinkTest {
// The PubsubTestClientFactory will assert fail on close if the actual published
// message does not match the expected publish message.
}
+
+ // TODO: We would like to test that failed Pubsub publish calls cause the already assigned
+ // (and random) record ids to be reused. However that can't be done without the test runnner
+ // supporting retrying bundles.
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
index b265d18..3b0a1c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
@@ -86,14 +86,14 @@ public class PubsubUnboundedSourceTest {
};
factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming);
PubsubUnboundedSource<String> source =
- new PubsubUnboundedSource<>(clock, factory, SUBSCRIPTION, StringUtf8Coder.of(),
+ new PubsubUnboundedSource<>(clock, factory, null, null, SUBSCRIPTION, StringUtf8Coder.of(),
TIMESTAMP_LABEL, ID_LABEL);
primSource = new PubsubSource<>(source);
}
private void setupOneMessage() {
setupOneMessage(ImmutableList.of(
- new IncomingMessage(DATA.getBytes(), TIMESTAMP, 0, ACK_ID, RECORD_ID.getBytes())));
+ new IncomingMessage(DATA.getBytes(), TIMESTAMP, 0, ACK_ID, RECORD_ID)));
}
@After
@@ -211,7 +211,7 @@ public class PubsubUnboundedSourceTest {
for (int i = 0; i < 2; i++) {
String data = String.format("data_%d", i);
String ackid = String.format("ackid_%d", i);
- incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, RECORD_ID.getBytes()));
+ incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, RECORD_ID));
}
setupOneMessage(incoming);
TestPipeline p = TestPipeline.create();
@@ -272,7 +272,7 @@ public class PubsubUnboundedSourceTest {
String recid = String.format("recordid_%d", messageNum);
String ackId = String.format("ackid_%d", messageNum);
incoming.add(new IncomingMessage(data.getBytes(), messageNumToTimestamp(messageNum), 0,
- ackId, recid.getBytes()));
+ ackId, recid));
}
setupOneMessage(incoming);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
index 40c31fb..0f3a7bb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java
@@ -34,7 +34,6 @@ import com.google.api.services.pubsub.model.PullResponse;
import com.google.api.services.pubsub.model.ReceivedMessage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.hash.Hashing;
import org.junit.After;
import org.junit.Before;
@@ -61,8 +60,7 @@ public class PubsubApiaryClientTest {
private static final String ID_LABEL = "id";
private static final String MESSAGE_ID = "testMessageId";
private static final String DATA = "testData";
- private static final String CUSTOM_ID =
- Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString();
+ private static final String RECORD_ID = "testRecordId";
private static final String ACK_ID = "testAckId";
@Before
@@ -89,7 +87,7 @@ public class PubsubApiaryClientTest {
.setPublishTime(String.valueOf(PUB_TIME))
.setAttributes(
ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
- ID_LABEL, CUSTOM_ID));
+ ID_LABEL, RECORD_ID));
ReceivedMessage expectedReceivedMessage =
new ReceivedMessage().setMessage(expectedPubsubMessage)
.setAckId(ACK_ID);
@@ -105,7 +103,7 @@ public class PubsubApiaryClientTest {
IncomingMessage actualMessage = acutalMessages.get(0);
assertEquals(ACK_ID, actualMessage.ackId);
assertEquals(DATA, new String(actualMessage.elementBytes));
- assertEquals(CUSTOM_ID, new String(actualMessage.recordId));
+ assertEquals(RECORD_ID, actualMessage.recordId);
assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
}
@@ -117,7 +115,7 @@ public class PubsubApiaryClientTest {
.encodeData(DATA.getBytes())
.setAttributes(
ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
- ID_LABEL, CUSTOM_ID));
+ ID_LABEL, RECORD_ID));
PublishRequest expectedRequest = new PublishRequest()
.setMessages(ImmutableList.of(expectedPubsubMessage));
PublishResponse expectedResponse = new PublishResponse()
@@ -127,7 +125,7 @@ public class PubsubApiaryClientTest {
.publish(expectedTopic, expectedRequest)
.execute())
.thenReturn(expectedResponse);
- OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+ OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID);
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
index 189049c..71ee27c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.util.PubsubClient.TopicPath;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.hash.Hashing;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PublishRequest;
@@ -70,8 +69,7 @@ public class PubsubGrpcClientTest {
private static final String ID_LABEL = "id";
private static final String MESSAGE_ID = "testMessageId";
private static final String DATA = "testData";
- private static final String CUSTOM_ID =
- Hashing.murmur3_128().hashBytes(DATA.getBytes()).toString();
+ private static final String RECORD_ID = "testRecordId";
private static final String ACK_ID = "testAckId";
@Before
@@ -118,7 +116,7 @@ public class PubsubGrpcClientTest {
.putAllAttributes(
ImmutableMap.of(TIMESTAMP_LABEL,
String.valueOf(MESSAGE_TIME),
- ID_LABEL, CUSTOM_ID))
+ ID_LABEL, RECORD_ID))
.build();
ReceivedMessage expectedReceivedMessage =
ReceivedMessage.newBuilder()
@@ -136,7 +134,7 @@ public class PubsubGrpcClientTest {
IncomingMessage actualMessage = acutalMessages.get(0);
assertEquals(ACK_ID, actualMessage.ackId);
assertEquals(DATA, new String(actualMessage.elementBytes));
- assertEquals(CUSTOM_ID, new String(actualMessage.recordId));
+ assertEquals(RECORD_ID, actualMessage.recordId);
assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
}
@@ -149,7 +147,7 @@ public class PubsubGrpcClientTest {
.setData(ByteString.copyFrom(DATA.getBytes()))
.putAllAttributes(
ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME),
- ID_LABEL, CUSTOM_ID))
+ ID_LABEL, RECORD_ID))
.build();
PublishRequest expectedRequest =
PublishRequest.newBuilder()
@@ -163,7 +161,7 @@ public class PubsubGrpcClientTest {
.build();
Mockito.when(mockPublisherStub.publish(expectedRequest))
.thenReturn(expectedResponse);
- OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
+ OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID);
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26941f15/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
index fedc8bf..d788f10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubTestClientTest.java
@@ -61,7 +61,7 @@ public class PubsubTestClientTest {
}
};
IncomingMessage expectedIncomingMessage =
- new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID.getBytes());
+ new IncomingMessage(DATA.getBytes(), MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID);
try (PubsubTestClientFactory factory =
PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S,
Lists.newArrayList(expectedIncomingMessage))) {
@@ -99,9 +99,13 @@ public class PubsubTestClientTest {
@Test
public void publishOneMessage() throws IOException {
- OutgoingMessage expectedOutgoingMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME);
- try (PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish(TOPIC, Sets
- .newHashSet(expectedOutgoingMessage))) {
+ OutgoingMessage expectedOutgoingMessage =
+ new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, MESSAGE_ID);
+ try (PubsubTestClientFactory factory =
+ PubsubTestClient.createFactoryForPublish(
+ TOPIC,
+ Sets.newHashSet(expectedOutgoingMessage),
+ ImmutableList.<OutgoingMessage>of())) {
try (PubsubTestClient client = (PubsubTestClient) factory.newClient(null, null, null)) {
client.publish(TOPIC, ImmutableList.of(expectedOutgoingMessage));
}