You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/07 20:48:13 UTC
[2/3] beam git commit: This closes #2286
This closes #2286
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f7e73bb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f7e73bb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f7e73bb
Branch: refs/heads/master
Commit: 5f7e73bbacf7096eed44002a54910a560b195801
Parents: b2de3db 8f4fa43
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jun 7 19:43:19 2017 +0200
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jun 7 13:41:20 2017 -0700
----------------------------------------------------------------------
.../translation/types/CoderTypeSerializer.java | 41 ++-
.../streaming/io/UnboundedSourceWrapper.java | 2 +
.../flink/streaming/TestCountingSource.java | 48 ++-
.../streaming/UnboundedSourceWrapperTest.java | 309 +++++++++++--------
.../beam/runners/dataflow/DataflowRunner.java | 24 +-
5 files changed, 269 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5f7e73bb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --cc runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index cce6ce7,cce6ce7..ed29330
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@@ -428,12 -428,12 +428,15 @@@ public class DataflowRunner extends Pip
public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) {
PTransform<PInput, PCollection<T>> original = transform.getTransform();
++ PCollection<T> output =
++ (PCollection) Iterables.getOnlyElement(transform.getOutputs().values());
return PTransformReplacement.of(
transform.getPipeline().begin(),
InstanceBuilder.ofType(replacement)
.withArg(DataflowRunner.class, runner)
.withArg(
(Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original)
++ .withArg((Class<? super PCollection<T>>) output.getClass(), output)
.build());
}
@@@ -809,11 -809,11 +812,12 @@@
extends PTransform<PBegin, PCollection<PubsubMessage>> {
private final PubsubUnboundedSource transform;
-- /**
-- * Builds an instance of this class from the overridden transform.
-- */
++ /** Builds an instance of this class from the overridden transform. */
++ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
public StreamingPubsubIORead(
-- DataflowRunner runner, PubsubUnboundedSource transform) {
++ DataflowRunner runner,
++ PubsubUnboundedSource transform,
++ PCollection<PubsubMessage> originalOutput) {
this.transform = transform;
}
@@@ -992,11 -992,11 +996,11 @@@
private static class StreamingUnboundedRead<T> extends PTransform<PBegin, PCollection<T>> {
private final UnboundedSource<T, ?> source;
-- /**
-- * Builds an instance of this class from the overridden transform.
-- */
++ /** Builds an instance of this class from the overridden transform. */
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-- public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded<T> transform) {
++ public StreamingUnboundedRead(DataflowRunner runner,
++ Read.Unbounded<T> transform,
++ PCollection<T> originalOutput) {
this.source = transform.getSource();
}
@@@ -1111,7 -1111,7 +1115,9 @@@
/** Builds an instance of this class from the overridden transform. */
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-- public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) {
++ public StreamingBoundedRead(DataflowRunner runner,
++ Read.Bounded<T> transform,
++ PCollection<T> originalOutput) {
this.source = transform.getSource();
}