You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 01:02:28 UTC
[3/6] beam git commit: Requires specifying a Coder on
PCollection.createPrimitiveOutputInternal
Requires specifying a Coder on PCollection.createPrimitiveOutputInternal
The coder can still be null, in which case it is left unspecified.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bb1bf3c1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bb1bf3c1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bb1bf3c1
Branch: refs/heads/master
Commit: bb1bf3c19ca0baa2c04cec9863bfcaca2024f94e
Parents: e017a0e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 26 17:14:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Aug 3 15:40:46 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 5 +-
.../FlattenPCollectionTranslator.java | 15 +-
.../apex/translation/ParDoTranslator.java | 7 +-
.../translation/ApexGroupByKeyOperatorTest.java | 6 +-
.../construction/PCollectionTranslation.java | 8 +-
.../construction/PTransformReplacements.java | 6 +
.../core/construction/PrimitiveCreate.java | 14 +-
.../core/construction/SplittableParDo.java | 3 +
.../UnboundedReadFromBoundedSource.java | 5 -
.../construction/PTransformMatchersTest.java | 79 +++++-----
.../construction/ReplacementOutputsTest.java | 14 +-
.../core/GroupByKeyViaGroupByKeyOnly.java | 15 +-
.../core/SplittableParDoViaKeyedWorkItems.java | 9 +-
.../beam/runners/direct/DirectGroupByKey.java | 31 ++--
.../direct/ParDoMultiOverrideFactory.java | 3 +
.../direct/TestStreamEvaluatorFactory.java | 8 +-
.../runners/direct/ViewOverrideFactory.java | 5 +-
.../runners/direct/CommittedResultTest.java | 26 +++-
.../runners/direct/EvaluationContextTest.java | 8 +-
.../runners/flink/CreateStreamingFlinkView.java | 5 +-
.../beam/runners/dataflow/AssignWindows.java | 4 +-
.../runners/dataflow/BatchViewOverrides.java | 19 +--
.../runners/dataflow/CreateDataflowView.java | 5 +-
.../beam/runners/dataflow/DataflowRunner.java | 147 ++++++++++---------
.../dataflow/PrimitiveParDoSingleFactory.java | 12 +-
.../DataflowPipelineTranslatorTest.java | 11 +-
.../runners/dataflow/DataflowRunnerTest.java | 11 +-
.../transforms/DataflowGroupByKeyTest.java | 12 +-
.../dataflow/transforms/DataflowViewTest.java | 14 +-
.../beam/runners/spark/io/CreateStream.java | 11 +-
.../translation/StorageLevelPTransform.java | 10 +-
.../util/SinglePrimitiveOutputPTransform.java | 51 -------
.../main/java/org/apache/beam/sdk/io/Read.java | 21 ++-
.../org/apache/beam/sdk/testing/TestStream.java | 5 +-
.../org/apache/beam/sdk/transforms/Flatten.java | 22 +--
.../apache/beam/sdk/transforms/GroupByKey.java | 12 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 2 +
.../org/apache/beam/sdk/transforms/View.java | 5 +-
.../beam/sdk/transforms/windowing/Window.java | 2 +-
.../org/apache/beam/sdk/values/PCollection.java | 9 +-
.../beam/sdk/values/PCollectionTuple.java | 10 +-
.../sdk/runners/TransformHierarchyTest.java | 41 +++---
.../beam/sdk/runners/TransformTreeTest.java | 12 +-
.../apache/beam/sdk/transforms/FlattenTest.java | 2 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 11 +-
.../apache/beam/sdk/transforms/ViewTest.java | 11 +-
.../beam/sdk/values/PCollectionTupleTest.java | 7 +-
47 files changed, 357 insertions(+), 394 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index fd0a1c9..cee524e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -227,9 +227,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
@Override
public PCollection<ElemT> expand(PCollection<ElemT> input) {
- return PCollection.<ElemT>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
}
public PCollectionView<ViewT> getView() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index 440b801..189cb65 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -110,8 +110,12 @@ class FlattenPCollectionTranslator<T> implements
}
if (collections.size() > 2) {
- PCollection<T> intermediateCollection = intermediateCollection(collection,
- collection.getCoder());
+ PCollection<T> intermediateCollection =
+ PCollection.createPrimitiveOutputInternal(
+ collection.getPipeline(),
+ collection.getWindowingStrategy(),
+ collection.isBounded(),
+ collection.getCoder());
context.addOperator(operator, operator.out, intermediateCollection);
remainingCollections.add(intermediateCollection);
} else {
@@ -135,11 +139,4 @@ class FlattenPCollectionTranslator<T> implements
}
}
- static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
- PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(),
- input.getWindowingStrategy(), input.isBounded());
- output.setCoder(outputCoder);
- return output;
- }
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index e46687a..be11b02 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -241,8 +241,11 @@ class ParDoTranslator<InputT, OutputT>
}
PCollection<Object> resultCollection =
- FlattenPCollectionTranslator.intermediateCollection(
- firstSideInput, firstSideInput.getCoder());
+ PCollection.createPrimitiveOutputInternal(
+ firstSideInput.getPipeline(),
+ firstSideInput.getWindowingStrategy(),
+ firstSideInput.isBounded(),
+ firstSideInput.getCoder());
FlattenPCollectionTranslator.flattenCollections(
sourceCollections, unionTags, resultCollection, context);
return resultCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
index 206b430..63a218b 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
@@ -59,9 +59,9 @@ public class ApexGroupByKeyOperatorTest {
WindowingStrategy<?, ?> ws = WindowingStrategy.of(FixedWindows.of(
Duration.standardSeconds(10)));
- PCollection<KV<String, Integer>> input = PCollection.createPrimitiveOutputInternal(pipeline,
- ws, IsBounded.BOUNDED);
- input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+ PCollection<KV<String, Integer>> input =
+ PCollection.createPrimitiveOutputInternal(
+ pipeline, ws, IsBounded.BOUNDED, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
ApexGroupByKeyOperator<String, Integer> operator = new ApexGroupByKeyOperator<>(options,
input, new ApexStateInternals.ApexStateBackend()
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index c0a5acf..c256e4c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -52,10 +52,10 @@ public class PCollectionTranslation {
Coder<?> coder = components.getCoder(pCollection.getCoderId());
return PCollection.createPrimitiveOutputInternal(
- pipeline,
- components.getWindowingStrategy(pCollection.getWindowingStrategyId()),
- fromProto(pCollection.getIsBounded()))
- .setCoder((Coder) coder);
+ pipeline,
+ components.getWindowingStrategy(pCollection.getWindowingStrategyId()),
+ fromProto(pCollection.getIsBounded()),
+ (Coder) coder);
}
public static IsBounded isBounded(RunnerApi.PCollection pCollection) {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
index 706a956..35bad15 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformReplacements.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.Iterables;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -66,4 +67,9 @@ public class PTransformReplacements {
ignoredTags);
return mainInput;
}
+
+ public static <T> PCollection<T> getSingletonMainOutput(
+ AppliedPTransform<?, PCollection<T>, ? extends PTransform<?, PCollection<T>>> transform) {
+ return ((PCollection<T>) Iterables.getOnlyElement(transform.getOutputs().values()));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
index f43d23b..62b6d0a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
@@ -18,7 +18,9 @@
package org.apache.beam.runners.core.construction;
+import com.google.common.collect.Iterables;
import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Create;
@@ -36,15 +38,17 @@ import org.apache.beam.sdk.values.WindowingStrategy;
*/
public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> {
private final Create.Values<T> transform;
+ private final Coder<T> coder;
- private PrimitiveCreate(Create.Values<T> transform) {
+ private PrimitiveCreate(Create.Values<T> transform, Coder<T> coder) {
this.transform = transform;
+ this.coder = coder;
}
@Override
public PCollection<T> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED, coder);
}
public Iterable<T> getElements() {
@@ -60,7 +64,11 @@ public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> {
public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
AppliedPTransform<PBegin, PCollection<T>, Values<T>> transform) {
return PTransformReplacement.of(
- transform.getPipeline().begin(), new PrimitiveCreate<T>(transform.getTransform()));
+ transform.getPipeline().begin(),
+ new PrimitiveCreate<T>(
+ transform.getTransform(),
+ ((PCollection<T>) Iterables.getOnlyElement(transform.getOutputs().values()))
+ .getCoder()));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index e71187b..bcc5de8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -273,6 +274,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
+ // TODO
+ Collections.<TupleTag<?>, Coder<?>>emptyMap(),
windowingStrategy,
input.isBounded().and(signature.isBoundedPerElement()));
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index f35f4c3..55f9519 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -91,11 +91,6 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
}
@Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getOutputCoder();
- }
-
- @Override
public String getKindString() {
return String.format("Read(%s)", NameUtils.approximateSimpleName(source));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 1862699..fa7e1e9 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -100,15 +100,16 @@ public class PTransformMatchersTest implements Serializable {
private AppliedPTransform<?, ?, ?> getAppliedTransform(PTransform pardo) {
PCollection<KV<String, Integer>> input =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p,
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
input.setName("dummy input");
- input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
PCollection<Integer> output =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
output.setName("dummy output");
- output.setCoder(VarIntCoder.of());
return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p);
}
@@ -133,7 +134,7 @@ public class PTransformMatchersTest implements Serializable {
@Override
public PCollection<Integer> expand(PCollection<KV<String, Integer>> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), VarIntCoder.of());
}
}
PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class);
@@ -425,14 +426,14 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+ .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
"EmptyFlatten",
Collections.<TupleTag<?>, PValue>emptyMap(),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.pCollections(),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ Flatten.<Integer>pCollections(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true));
@@ -442,17 +443,17 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithNonEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+ .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
"Flatten",
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.pCollections(),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ Flatten.<Integer>pCollections(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
@@ -462,15 +463,15 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithNonFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
+ .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
"EmptyFlatten",
Collections.<TupleTag<?>, PValue>emptyMap(),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.iterables() /* This isn't actually possible to construct,
- * but for the sake of example */,
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ /* This isn't actually possible to construct, but for the sake of example */
+ Flatten.<Integer>iterables(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
@@ -480,17 +481,17 @@ public class PTransformMatchersTest implements Serializable {
public void flattenWithDuplicateInputsWithoutDuplicates() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+ .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
"Flatten",
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.pCollections(),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ Flatten.<Integer>pCollections(),
p);
assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
@@ -498,22 +499,22 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void flattenWithDuplicateInputsWithDuplicates() {
- PCollection<Object> duplicate =
+ PCollection<Integer> duplicate =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+ .<PCollectionList<Integer>, PCollection<Integer>, Flatten.PCollections<Integer>>of(
"Flatten",
ImmutableMap.<TupleTag<?>, PValue>builder()
- .put(new TupleTag<Object>(), duplicate)
- .put(new TupleTag<Object>(), duplicate)
+ .put(new TupleTag<Integer>(), duplicate)
+ .put(new TupleTag<Integer>(), duplicate)
.build(),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.pCollections(),
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ Flatten.<Integer>pCollections(),
p);
assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(true));
@@ -523,15 +524,15 @@ public class PTransformMatchersTest implements Serializable {
public void flattenWithDuplicateInputsNonFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
+ .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
"EmptyFlatten",
Collections.<TupleTag<?>, PValue>emptyMap(),
Collections.<TupleTag<?>, PValue>singletonMap(
- new TupleTag<Object>(),
+ new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
- Flatten.iterables() /* This isn't actually possible to construct,
- * but for the sake of example */,
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
+ /* This isn't actually possible to construct, but for the sake of example */
+ Flatten.<Integer>iterables(),
p);
assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
index f8d01e9..0165e4b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.util.Map;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
@@ -50,23 +52,23 @@ public class ReplacementOutputsTest {
private PCollection<Integer> ints =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
private PCollection<Integer> moreInts =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
private PCollection<String> strs =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
private PCollection<Integer> replacementInts =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
private PCollection<Integer> moreReplacementInts =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of());
private PCollection<String> replacementStrs =
PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, StringUtf8Coder.of());
@Test
public void singletonSucceeds() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
index fca3c76..1fdf07c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -111,12 +111,10 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
@Override
public PCollection<KV<K, Iterable<WindowedValue<V>>>> expand(PCollection<KV<K, V>> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- }
-
- @Override
- public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
- return GroupByKey.getOutputKvCoder(input.getCoder());
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.isBounded(),
+ (Coder) GroupByKey.getOutputKvCoder(input.getCoder()));
}
}
@@ -244,9 +242,8 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
- return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
- input.getPipeline(), windowingStrategy, input.isBounded())
- .setCoder(outputKvCoder);
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), windowingStrategy, input.isBounded(), outputKvCoder);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 6e97645..af720fd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -72,8 +72,15 @@ public class SplittableParDoViaKeyedWorkItems {
PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
@Override
public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
+ KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ input.isBounded(),
+ KeyedWorkItemCoder.of(
+ kvCoder.getKeyCoder(),
+ kvCoder.getValueCoder(),
+ input.getWindowingStrategy().getWindowFn().windowCoder()));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 06b8e29..3ba04e7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -24,7 +24,6 @@ import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.construction.ForwardingPTransform;
import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -78,22 +77,18 @@ class DirectGroupByKey<K, V>
@Override
public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ input.isBounded(),
+ KeyedWorkItemCoder.of(
+ GroupByKey.getKeyCoder(input.getCoder()),
+ GroupByKey.getInputValueCoder(input.getCoder()),
+ input.getWindowingStrategy().getWindowFn().windowCoder()));
}
DirectGroupByKeyOnly() {}
@Override
- protected Coder<?> getDefaultOutputCoder(
- @SuppressWarnings("unused") PCollection<KV<K, V>> input)
- throws CannotProvideCoderException {
- return KeyedWorkItemCoder.of(
- GroupByKey.getKeyCoder(input.getCoder()),
- GroupByKey.getInputValueCoder(input.getCoder()),
- input.getWindowingStrategy().getWindowFn().windowCoder());
- }
-
- @Override
public String getUrn() {
return DIRECT_GBKO_URN;
}
@@ -135,17 +130,11 @@ class DirectGroupByKey<K, V>
}
@Override
- protected Coder<?> getDefaultOutputCoder(
- @SuppressWarnings("unused") PCollection<KeyedWorkItem<K, V>> input)
- throws CannotProvideCoderException {
- KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
- return KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder()));
- }
-
- @Override
public PCollection<KV<K, Iterable<V>>> expand(PCollection<KeyedWorkItem<K, V>> input) {
+ KeyedWorkItemCoder<K, V> inputCoder = getKeyedWorkItemCoder(input.getCoder());
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), outputWindowingStrategy, input.isBounded());
+ input.getPipeline(), outputWindowingStrategy, input.isBounded(),
+ KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getElementCoder())));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 891d102..3f04b56 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkState;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.KeyedWorkItem;
@@ -248,6 +249,8 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(getMainOutputTag()).and(getAdditionalOutputTags().getAll()),
+ // TODO
+ Collections.<TupleTag<?>, Coder<?>>emptyMap(),
input.getWindowingStrategy(),
input.isBounded());
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 16c8589..49e7be7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -207,9 +207,11 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public PCollection<T> expand(PBegin input) {
runner.setClockSupplier(new TestClockSupplier());
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(original.getValueCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.UNBOUNDED,
+ original.getValueCoder());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 5dcf016..c2255fe 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -115,9 +115,8 @@ class ViewOverrideFactory<ElemT, ViewT>
@Override
@SuppressWarnings("deprecation")
public PCollection<Iterable<ElemT>> expand(PCollection<Iterable<ElemT>> input) {
- return PCollection.<Iterable<ElemT>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
}
@SuppressWarnings("deprecation")
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 8b95b34..29ed55d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -113,13 +114,24 @@ public class CommittedResultTest implements Serializable {
@Test
public void getOutputsEqualInput() {
- List<? extends CommittedBundle<?>> outputs =
- ImmutableList.of(bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.BOUNDED)).commit(Instant.now()),
- bundleFactory.createBundle(PCollection.createPrimitiveOutputInternal(p,
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
+ List<? extends CommittedBundle<Integer>> outputs =
+ ImmutableList.of(
+ bundleFactory
+ .createBundle(
+ PCollection.createPrimitiveOutputInternal(
+ p,
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.BOUNDED,
+ VarIntCoder.of()))
+ .commit(Instant.now()),
+ bundleFactory
+ .createBundle(
+ PCollection.createPrimitiveOutputInternal(
+ p,
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ VarIntCoder.of()))
+ .commit(Instant.now()));
CommittedResult result =
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 699a318..cc9ce60 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
@@ -127,8 +128,11 @@ public class EvaluationContextTest {
public void writeToViewWriterThenReadReads() {
PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
context.createPCollectionViewWriter(
- PCollection.<Iterable<Integer>>createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED),
+ PCollection.createPrimitiveOutputInternal(
+ p,
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ IterableCoder.of(VarIntCoder.of())),
view);
BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
BoundedWindow second = new TestBoundedWindow(new Instant(899999L));
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
index 0cc3aec..3114a6f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
@@ -120,9 +120,8 @@ class CreateStreamingFlinkView<ElemT, ViewT>
@Override
public PCollection<List<ElemT>> expand(PCollection<List<ElemT>> input) {
- return PCollection.<List<ElemT>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
}
public PCollectionView<ViewT> getView() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
index 572b005..d015d2b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -59,8 +59,8 @@ class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
transform.getOutputStrategyInternal(input.getWindowingStrategy());
if (transform.getWindowFn() != null) {
// If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), outputStrategy, input.isBounded());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), outputStrategy, input.isBounded(), input.getCoder());
} else {
// If the windowFn didn't change, we just run a pass-through transform and then set the
// new windowing strategy.
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ad3faed..9a77b4b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -1258,18 +1258,13 @@ class BatchViewOverrides {
@Override
public PCollection<KV<K1, Iterable<KV<K2, V>>>> expand(PCollection<KV<K1, KV<K2, V>>> input) {
- PCollection<KV<K1, Iterable<KV<K2, V>>>> rval =
- PCollection.<KV<K1, Iterable<KV<K2, V>>>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- IsBounded.BOUNDED);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- KvCoder<K1, KV<K2, V>> inputCoder = (KvCoder) input.getCoder();
- rval.setCoder(
- KvCoder.of(inputCoder.getKeyCoder(),
- IterableCoder.of(inputCoder.getValueCoder())));
- return rval;
+ @SuppressWarnings("unchecked")
+ KvCoder<K1, KV<K2, V>> inputCoder = (KvCoder<K1, KV<K2, V>>) input.getCoder();
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index caad7f8..3b01d69 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -37,9 +37,8 @@ public class CreateDataflowView<ElemT, ViewT>
@Override
public PCollection<ElemT> expand(PCollection<ElemT> input) {
- return PCollection.<ElemT>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
}
public PCollectionView<ViewT> getView() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 8fce5b4..6999616 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -321,7 +321,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
- new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this)));
+ new StreamingPubsubIOReadOverrideFactory()));
}
if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
overridesBuilder.add(
@@ -359,11 +359,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// must precede it
PTransformOverride.of(
PTransformMatchers.classEqualTo(Read.Bounded.class),
- new ReflectiveRootOverrideFactory(StreamingBoundedRead.class, this)))
+ new StreamingBoundedReadOverrideFactory()))
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(Read.Unbounded.class),
- new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this)))
+ new StreamingUnboundedReadOverrideFactory()))
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
@@ -448,38 +448,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
- private static class ReflectiveRootOverrideFactory<T>
- implements PTransformOverrideFactory<
- PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> {
- private final Class<PTransform<PBegin, PCollection<T>>> replacement;
- private final DataflowRunner runner;
-
- private ReflectiveRootOverrideFactory(
- Class<PTransform<PBegin, PCollection<T>>> replacement, DataflowRunner runner) {
- this.replacement = replacement;
- this.runner = runner;
- }
-
- @Override
- public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
- AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) {
- PTransform<PInput, PCollection<T>> original = transform.getTransform();
- return PTransformReplacement.of(
- transform.getPipeline().begin(),
- InstanceBuilder.ofType(replacement)
- .withArg(DataflowRunner.class, runner)
- .withArg(
- (Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original)
- .build());
- }
-
- @Override
- public Map<PValue, ReplacementOutput> mapOutputs(
- Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
- return ReplacementOutputs.singleton(outputs, newOutput);
- }
- }
-
private String debuggerMessage(String projectId, String uniquifier) {
return String.format("To debug your job, visit Google Cloud Debugger at: "
+ "https://console.developers.google.com/debug?project=%s&dbgee=%s",
@@ -838,6 +806,24 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// PubsubIO translations
// ================================================================================
+ private static class StreamingPubsubIOReadOverrideFactory
+ implements PTransformOverrideFactory<
+ PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> {
+ @Override
+ public PTransformReplacement<PBegin, PCollection<PubsubMessage>> getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> transform) {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(), new StreamingPubsubIORead(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollection<PubsubMessage> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
+
/**
* Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we can
* instead defer to Windmill's implementation.
@@ -846,9 +832,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
extends PTransform<PBegin, PCollection<PubsubMessage>> {
private final PubsubUnboundedSource transform;
- /** Builds an instance of this class from the overridden transform. */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingPubsubIORead(DataflowRunner runner, PubsubUnboundedSource transform) {
+ public StreamingPubsubIORead(PubsubUnboundedSource transform) {
this.transform = transform;
}
@@ -858,9 +842,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public PCollection<PubsubMessage> expand(PBegin input) {
- return PCollection.<PubsubMessage>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(new PubsubMessageWithAttributesCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.UNBOUNDED,
+ new PubsubMessageWithAttributesCoder());
}
@Override
@@ -1129,12 +1115,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public PCollection<byte[]> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), isBounded);
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder() {
- return ByteArrayCoder.of();
+ input.getPipeline(), WindowingStrategy.globalDefault(), isBounded, ByteArrayCoder.of());
}
private static class Translator implements TransformTranslator<Impulse> {
@@ -1157,6 +1138,22 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
+ private static class StreamingUnboundedReadOverrideFactory<T>
+ implements PTransformOverrideFactory<PBegin, PCollection<T>, Read.Unbounded<T>> {
+ @Override
+ public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<T>, Read.Unbounded<T>> transform) {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(), new StreamingUnboundedRead<>(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
/**
* Specialized implementation for
* {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the
@@ -1168,18 +1165,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
private static class StreamingUnboundedRead<T> extends PTransform<PBegin, PCollection<T>> {
private final UnboundedSource<T, ?> source;
- /** Builds an instance of this class from the overridden transform. */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded<T> transform) {
+ public StreamingUnboundedRead(Read.Unbounded<T> transform) {
this.source = transform.getSource();
}
@Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getOutputCoder();
- }
-
- @Override
public final PCollection<T> expand(PBegin input) {
source.validate();
@@ -1206,13 +1196,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public final PCollection<ValueWithRecordId<T>> expand(PInput input) {
- return PCollection.<ValueWithRecordId<T>>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
- }
-
- @Override
- protected Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
- return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED,
+ ValueWithRecordId.ValueWithRecordIdCoder.of(source.getOutputCoder()));
}
@Override
@@ -1276,6 +1262,22 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
+ private static class StreamingBoundedReadOverrideFactory<T>
+ implements PTransformOverrideFactory<PBegin, PCollection<T>, Read.Bounded<T>> {
+ @Override
+ public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+ AppliedPTransform<PBegin, PCollection<T>, Read.Bounded<T>> transform) {
+ return PTransformReplacement.of(
+ transform.getPipeline().begin(), new StreamingBoundedRead<>(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
/**
* Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded} for the
* Dataflow runner in streaming mode.
@@ -1283,18 +1285,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
private static class StreamingBoundedRead<T> extends PTransform<PBegin, PCollection<T>> {
private final BoundedSource<T> source;
- /** Builds an instance of this class from the overridden transform. */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) {
+ public StreamingBoundedRead(Read.Bounded<T> transform) {
this.source = transform.getSource();
}
@Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getOutputCoder();
- }
-
- @Override
public final PCollection<T> expand(PBegin input) {
source.validate();
@@ -1404,15 +1399,19 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
static class CombineGroupedValues<K, InputT, OutputT>
extends PTransform<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>> {
private final Combine.GroupedValues<K, InputT, OutputT> original;
+ private final Coder<KV<K, OutputT>> outputCoder;
- CombineGroupedValues(GroupedValues<K, InputT, OutputT> original) {
+ CombineGroupedValues(
+ GroupedValues<K, InputT, OutputT> original, Coder<KV<K, OutputT>> outputCoder) {
this.original = original;
+ this.outputCoder = outputCoder;
}
@Override
public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, Iterable<InputT>>> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(),
+ outputCoder);
}
public Combine.GroupedValues<K, InputT, OutputT> getOriginalCombine() {
@@ -1433,7 +1432,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new CombineGroupedValues<>(transform.getTransform()));
+ new CombineGroupedValues<>(
+ transform.getTransform(),
+ PTransformReplacements.getSingletonMainOutput(transform).getCoder()));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 8611d3c..9252c64 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.beam.runners.core.construction.ForwardingPTransform;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.DisplayData;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -49,7 +50,9 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new ParDoSingle<>(transform.getTransform()));
+ new ParDoSingle<>(
+ transform.getTransform(),
+ PTransformReplacements.getSingletonMainOutput(transform).getCoder()));
}
/**
@@ -58,15 +61,18 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
public static class ParDoSingle<InputT, OutputT>
extends ForwardingPTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
private final ParDo.SingleOutput<InputT, OutputT> original;
+ private final Coder<OutputT> outputCoder;
- private ParDoSingle(ParDo.SingleOutput<InputT, OutputT> original) {
+ private ParDoSingle(SingleOutput<InputT, OutputT> original, Coder<OutputT> outputCoder) {
this.original = original;
+ this.outputCoder = outputCoder;
}
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(),
+ outputCoder);
}
public DoFn<InputT, OutputT> getFn() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 9a0bdf8..7a99f75 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -650,10 +650,13 @@ public class DataflowPipelineTranslatorTest implements Serializable {
// Fails here when attempting to construct a tuple with an unbound object.
return PCollectionTuple.of(sumTag, sum)
- .and(doneTag, PCollection.<Void>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- input.isBounded()));
+ .and(
+ doneTag,
+ PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ input.isBounded(),
+ VoidCoder.of()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 9db73c6..55264a1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -72,7 +72,6 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOption
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
@@ -953,15 +952,11 @@ public class DataflowRunnerTest implements Serializable {
@Override
public PCollection<Integer> expand(PCollection<Integer> input) {
- return PCollection.<Integer>createPrimitiveOutputInternal(
+ return PCollection.createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
- input.isBounded());
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder(PCollection<Integer> input) {
- return input.getCoder();
+ input.isBounded(),
+ input.getCoder());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index 737b408..c198ebf 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import org.junit.Before;
@@ -105,11 +105,11 @@ public class DataflowGroupByKeyTest {
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> expand(PBegin input) {
- return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)
- .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index dea96b9..e2e42a6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -21,6 +21,9 @@ import com.google.api.services.dataflow.Dataflow;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
@@ -33,7 +36,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
@@ -94,11 +96,11 @@ public class DataflowViewTest {
new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
@Override
public PCollection<KV<String, Integer>> expand(PBegin input) {
- return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED)
- .setTypeDescriptor(new TypeDescriptor<KV<String, Integer>>() {});
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
}
})
.apply(view);
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index fdcea99..d485d25 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -27,7 +27,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -203,11 +202,9 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
@Override
public PCollection<T> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder() throws CannotProvideCoderException {
- return coder;
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.UNBOUNDED,
+ coder);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
index 0ecfa75..b236ce7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.spark.translation;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
@@ -32,12 +31,7 @@ public final class StorageLevelPTransform extends PTransform<PCollection<?>, PCo
public PCollection<String> expand(PCollection<?> input) {
return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
WindowingStrategy.globalDefault(),
- PCollection.IsBounded.BOUNDED);
+ PCollection.IsBounded.BOUNDED,
+ StringUtf8Coder.of());
}
-
- @Override
- public Coder getDefaultOutputCoder() {
- return StringUtf8Coder.of();
- }
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
deleted file mode 100644
index 299f5ba..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.util;
-
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * A {@link PTransform} wrapping another transform.
- */
-public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PCollection<T>> {
- private PTransform<PInput, PCollection<T>> transform;
-
- public SinglePrimitiveOutputPTransform(PTransform<PInput, PCollection<T>> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollection<T> expand(PInput input) {
- try {
- PCollection<T> collection = PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
- collection.setCoder(transform.getDefaultOutputCoder(input, collection));
- return collection;
- } catch (CannotProvideCoderException e) {
- throw new IllegalArgumentException(
- "Unable to infer a coder and no Coder was specified. "
- + "Please set a coder by invoking Create.withCoder() explicitly.",
- e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 6e6750d..574ba0c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -95,17 +95,14 @@ public class Read {
}
@Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getOutputCoder();
- }
-
- @Override
public final PCollection<T> expand(PBegin input) {
source.validate();
- return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
- WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
- .setCoder(getDefaultOutputCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.BOUNDED,
+ source.getOutputCoder());
}
/**
@@ -170,9 +167,11 @@ public class Read {
@Override
public final PCollection<T> expand(PBegin input) {
source.validate();
-
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ IsBounded.UNBOUNDED,
+ source.getOutputCoder());
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index d13fcf1..45f4413 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -253,9 +253,8 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
@Override
public PCollection<T> expand(PBegin input) {
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
- .setCoder(coder);
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, coder);
}
public Coder<T> getValueCoder() {
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 25d9c05..8247a58 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.transforms;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableLikeCoder;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -129,25 +128,12 @@ public class Flatten {
windowingStrategy = WindowingStrategy.globalDefault();
}
- return PCollection.<T>createPrimitiveOutputInternal(
+ return PCollection.createPrimitiveOutputInternal(
inputs.getPipeline(),
windowingStrategy,
- isBounded);
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder(PCollectionList<T> input)
- throws CannotProvideCoderException {
-
- // Take coder from first collection
- for (PCollection<T> pCollection : input.getAll()) {
- return pCollection.getCoder();
- }
-
- // No inputs
- throw new CannotProvideCoderException(
- this.getClass().getSimpleName() + " cannot provide a Coder for"
- + " empty " + PCollectionList.class.getSimpleName());
+ isBounded,
+ // Take coder from first collection. If there are none, will be left unspecified.
+ inputs.getAll().isEmpty() ? null : inputs.get(0).getCoder());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index 7516b25..3cb0d23 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -217,13 +217,11 @@ public class GroupByKey<K, V>
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
// window function associated with the input PCollection.
- return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
- updateWindowingStrategy(input.getWindowingStrategy()), input.isBounded());
- }
-
- @Override
- protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
- return getOutputKvCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(),
+ updateWindowingStrategy(input.getWindowingStrategy()),
+ input.isBounded(),
+ getOutputKvCoder(input.getCoder()));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 0d03835..bc4f629 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -767,6 +767,8 @@ public class ParDo {
PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
+ // TODO
+ Collections.<TupleTag<?>, Coder<?>>emptyMap(),
input.getWindowingStrategy(),
input.isBounded());
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 57dccbc..f6f3af5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -509,9 +509,8 @@ public class View {
@Override
public PCollection<ElemT> expand(PCollection<ElemT> input) {
- return PCollection.<ElemT>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/bb1bf3c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index a12be6d..af583e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -484,7 +484,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
@Override
public PCollection<T> expand(PCollection<T> input) {
return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), updatedStrategy, input.isBounded());
+ input.getPipeline(), updatedStrategy, input.isBounded(), input.getCoder());
}
@Override